Spring rabbit 重试传递被拒绝的消息..可以吗?

Spring rabbit retries to deliver rejected message..is it OK?

我有以下配置

spring.rabbitmq.listener.prefetch=1
spring.rabbitmq.listener.concurrency=1
spring.rabbitmq.listener.retry.enabled=true
spring.rabbitmq.listener.retry.max-attempts=3
spring.rabbitmq.listener.retry.max-interval=1000
spring.rabbitmq.listener.default-requeue-rejected=false //I have also changed it to true but the same behavior still happens

在我的监听器中我抛出异常 AmqpRejectAndDontRequeueException 拒绝消息并强制兔子不要尝试重新传递它...但是rabbit redilvers了3次,最后路由到dead letter queue。

根据我提供的配置,这是标准行为还是我错过了什么?

您必须将重试策略配置为不重试该异常。

你不能用属性来做到这一点,你必须自己配置重试建议。

如果您需要这方面的帮助,我稍后会 post 举个例子。

requeue-rejected 处于容器级别(在堆栈上重试以下)。

编辑

@SpringBootApplication
public class So39853762Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39853762Application.class, args);
        Thread.sleep(60000);
        context.close();
    }

    @RabbitListener(queues = "foo")
    public void foo(String foo) {
        System.out.println(foo);
        if ("foo".equals(foo)) {
            throw new AmqpRejectAndDontRequeueException("foo"); // won't be retried.
        }
        else {
            throw new IllegalStateException("bar"); // will be retried
        }
    }

    @Bean
    public ListenerRetryAdviceCustomizer retryCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
            RabbitProperties rabbitPropeties) {
        return new ListenerRetryAdviceCustomizer(containerFactory, rabbitPropeties);
    }

    public static class ListenerRetryAdviceCustomizer implements InitializingBean {

        private final SimpleRabbitListenerContainerFactory containerFactory;

        private final RabbitProperties rabbitPropeties;

        public ListenerRetryAdviceCustomizer(SimpleRabbitListenerContainerFactory containerFactory,
                RabbitProperties rabbitPropeties) {
            this.containerFactory = containerFactory;
            this.rabbitPropeties = rabbitPropeties;
        }

        @Override
        public void afterPropertiesSet() throws Exception {
            ListenerRetry retryConfig = this.rabbitPropeties.getListener().getRetry();
            if (retryConfig.isEnabled()) {
                RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                        ? RetryInterceptorBuilder.stateless()
                        : RetryInterceptorBuilder.stateful());
                Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
                retryableExceptions.put(AmqpRejectAndDontRequeueException.class, false);
                retryableExceptions.put(IllegalStateException.class, true);
                SimpleRetryPolicy policy =
                        new SimpleRetryPolicy(retryConfig.getMaxAttempts(), retryableExceptions, true);
                ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
                backOff.setInitialInterval(retryConfig.getInitialInterval());
                backOff.setMultiplier(retryConfig.getMultiplier());
                backOff.setMaxInterval(retryConfig.getMaxInterval());
                builder.retryPolicy(policy)
                    .backOffPolicy(backOff)
                    .recoverer(new RejectAndDontRequeueRecoverer());
                this.containerFactory.setAdviceChain(builder.build());
            }
        }

    }

}

注意:您目前无法配置重试所有异常的策略,"except"这个 - 您必须对所有要重试的异常进行分类(它们可以t 是 AmqpRejectAndDontRequeueException 的超类)。我打开了 issue to support this.

使用 Spring Boot 2.3.5 和 Spring AMQP Starter 2.2.12 时,此处发布的其他答案对我不起作用,但对于这些版本,我能够自定义重试策略不重试 AmqpRejectAndDontRequeueException 异常:

@Configuration
public class RabbitConfiguration {

@Bean
public RabbitRetryTemplateCustomizer customizeRetryPolicy(
        @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int maxAttempts) {
    SimpleRetryPolicy policy = new SimpleRetryPolicy(maxAttempts, Map.of(AmqpRejectAndDontRequeueException.class, false), true, true);
    return (target, retryTemplate) -> retryTemplate.setRetryPolicy(policy);
}

}

这让重试策略跳过 AmqpRejectAndDontRequeueExceptions 的重试,但像往常一样重试所有其他异常。

这样配置,它会遍历异常原因,如果发现 AmqpRejectAndDontRequeueException 则跳过重试。

需要遍历原因,因为 org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter#invokeHandler 将所有异常包装为 ListenerExecutionFailedException