Rabbitmq + Spring 引导:发送损坏消息之间的延迟

Rabbit MQ + Spring Boot: delay between resend broken messages

我正在使用 Spring Boot with RabbitMQ 创建应用程序。 我已经为 Rabbit 创建了这样的配置:

@Configuration
public class RabbitConfiguration {
    public static final String RESEND_DISPOSAL_QUEUE = "RESEND_DISPOSAL";

    @Bean
    public Queue resendDisposalQueue() {
        return new Queue(RESEND_DISPOSAL_QUEUE, true);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory (ConnectionFactory connectionFactoryr) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}

我还为 Rabbit 消息创建了监听器,如下所示:

@RabbitListener(queues = RESEND_DISPOSAL_QUEUE)
public void getResendDisposalPayload(String messageBody){
    LOGGER.info("[getResendDisposalPayload] message = {}", messageBody);
    // And there is some business logic
}

一切正常,但有一个问题。 当我在侦听 RESEND_DISPOSAL_QUEUE 队列的方法 getResendDisposalPayload 中遇到异常时(例如数据库的临时问题),Rabbit 开始重新发送最后一个未处理的消息,没有任何延迟。它会产生大量日志,并且出于某种原因让我的系统不舒服。

正如我在这篇文章中读到的那样 https://www.baeldung.com/spring-amqp-exponential-backoff“虽然使用死信队列是处理失败消息的标准方法”。 为了使用此模式,我必须创建 RetryOperationsInterceptor,它定义了传递消息的计数尝试和尝试之间的延迟。 例如:

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .backOffOptions(1000, 3.0, 10000)
            .maxAttempts(3)
            .recoverer(messageRecoverer)
            .build();
}

听起来不错,但只有一个问题:我无法在选项 maxAttempts 中定义无限尝试次数。 在 maxAttempts 之后,我必须将损坏的消息保存在某个地方并在将来处理它。它需要一些额外的代码。

问题是:是否有任何方法可以将 Rabbit 配置为无限地重新发送损坏的消息并延迟一些时间,比如延迟一秒?

Rabbit starts resend last not processed message without any delay

这就是重新投递的工作原理:它一次又一次地重新推送相同的消息,直到您手动确认或完全放弃。重新传递之间没有延迟,只是因为在完成此消息之前不会从队列中拉出新消息。

I can't define infinity attempt amount in options maxAttempts

您尝试过 Integer.MAX_VALUE 吗?相当不错的尝试次数。

另一种方法是使用延迟交换:https://docs.spring.io/spring-amqp/docs/current/reference/html/#delayed-message-exchange

您可以使用 RepublishMessageRecoverer 配置重试,以便在一些尝试用完后发布到您的原始队列:https://docs.spring.io/spring-amqp/docs/current/reference/html/#async-listeners