尽管 DefaultRequeueRejected=false,但消息在 rabbit broker 中仍未确认

Message remains Unack'd in the rabbit broker despite DefaultRequeueRejected=false

我的场景: 我向我的 Rabbit 代理发布了两条消息,在处理第一条消息时发生了未处理的异常。

我的问题:为什么消息在代理中保持未确认状态,因此为什么第二条消息没有被出列和处理?

一些信息: 我正在使用 Spring AMQP 1.5.4 和 Spring Integration 4.2.4。 (见下面的代码) 我设置了一个 Dead Letter Exchange,它按预期工作(即,当我 Nack 一条消息时,它被转发到 DLX 并在那里过期。然后转发到主 Exchange)。

我想要的: 我希望未处理的异常(即被 SimpleMessageListenerContainer 捕获的异常)导致 amqp 消息被 Nack'd 而不是保持 Unack'd。

我看到的: 有 3 次重试尝试处理消息,当然由于我的强制异常而失败(请参阅下面 ErrorHandler 中的代码)。

BlockingQueueConsumerconsumer tag 相同所以我猜测 BlockingQueueConsumer 没有重新启动。但是,下面的日志显示它确实继续等待消息。

我想知道为什么 BlockingQueueConsumer 不 nack 消息,以及为什么尽管日志中有证据表明消费者正在等待消息,但后续消息没有被消费。

非常欢迎任何建议或背景信息!

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, Queue mainQueue, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleMessageListenerContainer retVal = new SimpleMessageListenerContainer(connectionFactory);
    retVal.addQueues(mainQueue);
    retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    retVal.setDefaultRequeueRejected(false);
    retVal.setAdviceChain(new Advice[]{retryOperationsInterceptor});
    return retVal;
}

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor () {
    return stateless().recoverer(new RejectAndDontRequeueRecoverer()).build();
}

<int-amqp:inbound-channel-adapter
    channel="fromRabbitChannel"
    error-channel="errorChannel"
    listener-container="simpleMessageListenerContainer"
    />

<int:service-activator ref="errorHandler" input-channel="errorChannel" method="handleError"/>

@MessageEndpoint
public class ErrorHandler {
    public void handleError(Message<MessagingException> message) throws IOException {
        throw new IllegalStateException("FORCED EXCEPTION");
    }
}

09:49:38.219 [SimpleAsyncTaskExecutor-1] INFO  c.p.a.f.ErrorHandler - Throwing an exception!!
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
09:49:38.220 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.r.RejectAndDontRequeueRecoverer - Retries exhausted for message (Body:'[B@c78ef32(byte[97])'MessageProperties [blah blah])
    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) [spring-rabbit-1.5.2.RELEASE.jar:na]
....
....
09:49:38.221 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:59) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:53) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:124) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:458) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:320) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:168) ~[spring-retry-1.1.2.RELEASE.jar:na]
....
....
09:49:38.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0
09:49:39.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0

retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);

使用手动确认,您有责任确认或拒绝邮件;如果您将模式设置为 AUTO,容器将仅 ack/nack;然后它将完全按照您的要求执行。