Spring AMQP @RabbitListener 业务错误自定义重试最佳实践
Spring AMQP @RabbitListener custom retry on business error best practice
以下场景:我有一个 @RabbitListener
从 RabbitMQ 获取消息。有时消息会出错,因为我找不到相关的业务对象。在这种情况下,我无法回复发件人,所以我只想在重试一定次数后忽略此消息。
我的解决方案:在我的 @RabbitListener
中,每当无法找到业务对象时,我都会抛出自定义运行时异常。在我的配置中,我有一个具有最大尝试次数的 RetryOperationsInterceptor
和一个自定义恢复器。
处理此类情况的最佳做法是什么?当有多个 @RabbitListener
时,我可以配置不同的恢复器 class 吗?
查看我的配置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(20);
Advice[] adviceChain = new Advice[] { interceptor() };
factory.setAdviceChain(adviceChain);
return factory;
}
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new CustomRejectAndRecoverer())
.build();
}
这是我的 CustomRejectAndRecoverer
:
public class CustomRejectAndRecoverer implements MessageRecoverer {
@Override
public void recover(Message message, Throwable cause) {
if (ExceptionUtils.getRootCause(cause) instanceof BusinessObjectNotFoundRuntimeException) {
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
}
}
}
如果来自共享 SimpleRabbitListenerContainerFactory
的通用逻辑不符合您对特定 @RabbitListener
的要求,您必须使用这些自定义选项声明一个新逻辑,例如关于此事的新 MessageRecoverer
。
为此,@RabbitListener
具有属性:
/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
您当前需要针对每个不同的重试配置使用不同的容器工厂。
在 2.0 中,我们向注解添加了一个新的 errorHandler
属性,因此每个侦听器都可以有一个自定义的错误处理程序,而不管它是由哪个容器工厂创建的。
这是在first milestone release;当前的里程碑是 M2,M3 将很快推出。预计 6 月正式发布。
以下场景:我有一个 @RabbitListener
从 RabbitMQ 获取消息。有时消息会出错,因为我找不到相关的业务对象。在这种情况下,我无法回复发件人,所以我只想在重试一定次数后忽略此消息。
我的解决方案:在我的 @RabbitListener
中,每当无法找到业务对象时,我都会抛出自定义运行时异常。在我的配置中,我有一个具有最大尝试次数的 RetryOperationsInterceptor
和一个自定义恢复器。
处理此类情况的最佳做法是什么?当有多个 @RabbitListener
时,我可以配置不同的恢复器 class 吗?
查看我的配置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(20);
Advice[] adviceChain = new Advice[] { interceptor() };
factory.setAdviceChain(adviceChain);
return factory;
}
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new CustomRejectAndRecoverer())
.build();
}
这是我的 CustomRejectAndRecoverer
:
public class CustomRejectAndRecoverer implements MessageRecoverer {
@Override
public void recover(Message message, Throwable cause) {
if (ExceptionUtils.getRootCause(cause) instanceof BusinessObjectNotFoundRuntimeException) {
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
}
}
}
如果来自共享 SimpleRabbitListenerContainerFactory
的通用逻辑不符合您对特定 @RabbitListener
的要求,您必须使用这些自定义选项声明一个新逻辑,例如关于此事的新 MessageRecoverer
。
为此,@RabbitListener
具有属性:
/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
您当前需要针对每个不同的重试配置使用不同的容器工厂。
在 2.0 中,我们向注解添加了一个新的 errorHandler
属性,因此每个侦听器都可以有一个自定义的错误处理程序,而不管它是由哪个容器工厂创建的。
这是在first milestone release;当前的里程碑是 M2,M3 将很快推出。预计 6 月正式发布。