Spring Boot RabbitMQ Execution of Rabbit message listener failed

RabbitMQ Spring Boot 大约 4192 字

错误信息

17:19:13.691 [SimpleAsyncTaskExecutor-14] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed. 
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.example.Test.test(com.rabbitmq.client.Channel,org.springframework.amqp.core.Message) throws java.io.IOException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
    at com.example.Test.test(Test.java:10)
    at sun.reflect.GeneratedMethodAccessor136.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
    ... 12 common frames omitted

错误现象

MQ队列积压,不再消费。

错误原因

由于设置了手动ack,而ack代码写在了发生异常代码之后,在发生异常后直接进catch分支或抛出了代码。

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue(value = "send_queue", durable = "true")
                , exchange = @Exchange(value = "send_exchange", type = "topic")
                , key = "send_key") })
public void sendQueue(Channel channel, Message message) throws IOException {
    String payload = new String(message.getBody(), StandardCharsets.UTF_8);
    log.info("send queue#{}", payload);

    // 业务逻辑
    int i = 1 / 0;

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

解决方法

将业务逻辑进行try...catch,而在异常处理后选择acknack或者reject

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue(value = "send_queue", durable = "true")
                , exchange = @Exchange(value = "send_exchange", type = "topic")
                , key = "send_key") })
public void sendQueue(Channel channel, Message message) throws IOException {
    String payload = new String(message.getBody(), StandardCharsets.UTF_8);

    try {
        // 业务逻辑
        int i = 1 / 0;
    } catch (Exception e) {
        log.error("send queue error#{},",payload, e);
    }

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
阅读 563 · 发布于 2020-12-14

————        END        ————

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批