RabbitListener 异常不发送消息到死信队列
RabbitListener Exception do not send message to dead letter queue
我在 Spring 启动应用程序中有一个 RabbitListener,它会在发生异常时重试并将消息发送到死信队列,这很好。但是,如果我的消息侦听器发生一个特定的异常,我不希望它被重试并转到我的死信队列,但我仍然希望我的事务被回滚。有办法吗?
正如我在下面的代码中所述,我尝试捕获异常但消费者重新启动。如果我抛出 AmqpRejectAndDontRequeueException 消息将不会被重试,但最终会进入死信队列
这是我的侦听器方法:
@RabbitListener(queues = "#{T(com.myproject.RabbitBinding).PROCESS_MESSAGE.getQueue()}")
public void onMessage(MyMessage message)
{
if (log.isDebugEnabled())
{
log.debug("Received message: {}", message);
}
try
{
process.run(message);
}
catch (IllegalStateException e)
{
log.error("Exception occured: e", e);
}
每当我捕获到异常时,它都会输出异常,但随后消费者会重新启动,因为我看到了这条消息::
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer killOrRestart
这是我的兔子配置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
Jackson2JsonMessageConverter messageConverter,
RetryAdviceChainFactory deadLetterRetryAdviceChainFactory,
PlatformTransactionManager transactionManager)
{
final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(connectionFactory);
cf.setConcurrentConsumers(rabbitConcurrentConsumers);
cf.setErrorHandler(defaultErrorHandler);
cf.setAdviceChain(deadLetterRetryAdviceChainFactory.createDefaultRetryChain());
cf.setChannelTransacted(true);
cf.setTaskExecutor(taskExecutor);
cf.setMessageConverter(messageConverter);
cf.setTransactionManager(transactionManager);
return cf;
}
@Bean
protected RetryAdviceChainFactory deadLetterRetryAdviceChainFactory(
MessageRecoverer deadLetterMessageRecoverer)
{
RetryAdviceChainFactory factory = new RetryAdviceChainFactory();
factory.setMessageRecoverer(deadLetterMessageRecoverer);
return factory;
}
抛出 ImmediateAcknowledgeAmqpException。
我在 Spring 启动应用程序中有一个 RabbitListener,它会在发生异常时重试并将消息发送到死信队列,这很好。但是,如果我的消息侦听器发生一个特定的异常,我不希望它被重试并转到我的死信队列,但我仍然希望我的事务被回滚。有办法吗?
正如我在下面的代码中所述,我尝试捕获异常但消费者重新启动。如果我抛出 AmqpRejectAndDontRequeueException 消息将不会被重试,但最终会进入死信队列
这是我的侦听器方法:
@RabbitListener(queues = "#{T(com.myproject.RabbitBinding).PROCESS_MESSAGE.getQueue()}")
public void onMessage(MyMessage message)
{
if (log.isDebugEnabled())
{
log.debug("Received message: {}", message);
}
try
{
process.run(message);
}
catch (IllegalStateException e)
{
log.error("Exception occured: e", e);
}
每当我捕获到异常时,它都会输出异常,但随后消费者会重新启动,因为我看到了这条消息::
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer killOrRestart
这是我的兔子配置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
Jackson2JsonMessageConverter messageConverter,
RetryAdviceChainFactory deadLetterRetryAdviceChainFactory,
PlatformTransactionManager transactionManager)
{
final SimpleRabbitListenerContainerFactory cf = new SimpleRabbitListenerContainerFactory();
cf.setConnectionFactory(connectionFactory);
cf.setConcurrentConsumers(rabbitConcurrentConsumers);
cf.setErrorHandler(defaultErrorHandler);
cf.setAdviceChain(deadLetterRetryAdviceChainFactory.createDefaultRetryChain());
cf.setChannelTransacted(true);
cf.setTaskExecutor(taskExecutor);
cf.setMessageConverter(messageConverter);
cf.setTransactionManager(transactionManager);
return cf;
}
@Bean
protected RetryAdviceChainFactory deadLetterRetryAdviceChainFactory(
MessageRecoverer deadLetterMessageRecoverer)
{
RetryAdviceChainFactory factory = new RetryAdviceChainFactory();
factory.setMessageRecoverer(deadLetterMessageRecoverer);
return factory;
}
抛出 ImmediateAcknowledgeAmqpException。