Spring AMQP @RabbitListener 业务错误自定义重试最佳实践

Spring AMQP @RabbitListener custom retry on business error best practice

以下场景:我有一个 @RabbitListener 从 RabbitMQ 获取消息。有时消息会出错,因为我找不到相关的业务对象。在这种情况下,我无法回复发件人,所以我只想在重试一定次数后忽略此消息。

我的解决方案:在我的 @RabbitListener 中,每当无法找到业务对象时,我都会抛出自定义运行时异常。在我的配置中,我有一个具有最大尝试次数的 RetryOperationsInterceptor 和一个自定义恢复器。

处理此类情况的最佳做法是什么?当有多个 @RabbitListener 时,我可以配置不同的恢复器 class 吗?

查看我的配置:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(new CustomMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    factory.setConcurrentConsumers(1);
    factory.setMaxConcurrentConsumers(20);

    Advice[] adviceChain = new Advice[] { interceptor() };
    factory.setAdviceChain(adviceChain);
    return factory;
}

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new CustomRejectAndRecoverer())
            .build();
}

这是我的 CustomRejectAndRecoverer:

public class CustomRejectAndRecoverer implements MessageRecoverer {

    @Override
    public void recover(Message message, Throwable cause) {
        if (ExceptionUtils.getRootCause(cause) instanceof BusinessObjectNotFoundRuntimeException) {
            throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                    new AmqpRejectAndDontRequeueException(cause), message);
        }
    }
}

如果来自共享 SimpleRabbitListenerContainerFactory 的通用逻辑不符合您对特定 @RabbitListener 的要求,您必须使用这些自定义选项声明一个新逻辑,例如关于此事的新 MessageRecoverer

为此,@RabbitListener 具有属性:

/**
 * The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
 * to use to create the message listener container responsible to serve this endpoint.
 * <p>If not specified, the default container factory is used, if any.
 * @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
 * bean name.
 */
String containerFactory() default "";

您当前需要针对每个不同的重试配置使用不同的容器工厂。

2.0 中,我们向注解添加了一个新的 errorHandler 属性,因此每个侦听器都可以有一个自定义的错误处理程序,而不管它是由哪个容器工厂创建的。

这是在first milestone release;当前的里程碑是 M2,M3 将很快推出。预计 6 月正式发布。