Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 小程序 Arthas JVM AQS juc Kubernetes Docker DevOps


Spring Boot RabbitMQ Execution of Rabbit message listener failed

RabbitMQ Spring Boot 大约 4192 字

错误信息

17:19:13.691 [SimpleAsyncTaskExecutor-14] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed. 
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.example.Test.test(com.rabbitmq.client.Channel,org.springframework.amqp.core.Message) throws java.io.IOException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
    at com.example.Test.test(Test.java:10)
    at sun.reflect.GeneratedMethodAccessor136.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
    ... 12 common frames omitted

错误现象

MQ队列积压,不再消费。

错误原因

由于设置了手动ack,而ack代码写在了发生异常代码之后,在发生异常后直接进catch分支或抛出了代码。

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue(value = "send_queue", durable = "true")
                , exchange = @Exchange(value = "send_exchange", type = "topic")
                , key = "send_key") })
public void sendQueue(Channel channel, Message message) throws IOException {
    String payload = new String(message.getBody(), StandardCharsets.UTF_8);
    log.info("send queue#{}", payload);

    // 业务逻辑
    int i = 1 / 0;

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

解决方法

将业务逻辑进行try...catch,而在异常处理后选择acknack或者reject

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue(value = "send_queue", durable = "true")
                , exchange = @Exchange(value = "send_exchange", type = "topic")
                , key = "send_key") })
public void sendQueue(Channel channel, Message message) throws IOException {
    String payload = new String(message.getBody(), StandardCharsets.UTF_8);

    try {
        // 业务逻辑
        int i = 1 / 0;
    } catch (Exception e) {
        log.error("send queue error#{},",payload, e);
    }

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
阅读 6094 · 发布于 2020-12-14

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批