在 Spring RabbitMQ 中,我抛出 AmqpRejectAndDontRequeueException 但消息仍然重新排队

In Spring RabbitMQ I throw AmqpRejectAndDontRequeueException but message still requeue

我的服务监听 RabbitMQ 队列。我在消费者端配置重试策略。当我抛出异常时,所有死信消息都会重新排队。但是取决于我的业务逻辑,在抛出 StopRequeueException(除 SmsException 之外的每个异常)之后,我想停止重试此消息。但是消息仍然重新排队。 这是我的配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-attempts: 10
          max-interval: 12s
          multiplier: 2
        missing-queues-fatal: false 
if (!checkMobileService.isMobileNumberAdmitted(mobileNumber())) {
    throw new StopRequeueException("SMS_BIMTEK.MOBILE_NUMBER_IS_NOT_ADMITTED");
}

我的错误处理程序:

public class CustomErrorHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof SmsException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

调用错误处理程序不在重试范围内;重试次数耗尽后调用。

您需要在重试级别分类哪些异常是可重试的,并在恢复器中进行转换。

这是一个例子:

@SpringBootApplication
public class So67406799Application {

    public static void main(String[] args) {
        SpringApplication.run(So67406799Application.class, args);
    }

    @Bean
    public RabbitRetryTemplateCustomizer customizer(
            @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int attempts) {

        return (target, template) -> template.setRetryPolicy(new SimpleRetryPolicy(attempts,
                Map.of(StopRequeueException.class, false), true, true));
    }

    @Bean
    MessageRecoverer recoverer() {
        return (msg, cause) -> {
            throw new AmqpRejectAndDontRequeueException("Stop requeue after " +
                    RetrySynchronizationManager.getContext().getRetryCount() + " attempts");
        };
    }

    @RabbitListener(queues = "so67406799")
    void listen(String in) {
        System.out.println(in);
        if (in.equals("dontRetry")) {
            throw new StopRequeueException("test");
        }
        throw new RuntimeException("test");
    }

    @Bean
    Queue queue() {
        return new Queue("so67406799");
    }

}

@SuppressWarnings("serial")
class StopRequeueException extends NestedRuntimeException {

    public StopRequeueException(String msg) {
        super(msg);
    }

}

编辑

自定义程序被Spring Boot 调用一次;在设置重试策略和退避策略后调用它。参见 RetryTemplateFactory

在这种情况下,定制器将重试策略替换为带有异常分类器的新策略(这就是我们需要在此处注入最大尝试次数的原因)。

参见 SimpleRetryPolicy 构造函数。

    /**
     * Create a {@link SimpleRetryPolicy} with the specified number of retry attempts. If
     * traverseCauses is true, the exception causes will be traversed until a match is
     * found. The default value indicates whether to retry or not for exceptions (or super
     * classes) are not found in the map.
     * @param maxAttempts the maximum number of attempts
     * @param retryableExceptions the map of exceptions that are retryable based on the
     * map value (true/false).
     * @param traverseCauses true to traverse the exception cause chain until a classified
     * exception is found or the root cause is reached.
     * @param defaultValue the default action.
     */
    public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
            boolean traverseCauses, boolean defaultValue) {

上面配置中的最后一个布尔值 (true) 是默认行为(重试映射中不存在的异常),第三个 (true) 告诉策略遵循原因链来查找异常(比如您在错误处理程序中的 getCause())。地图 <Exception, Boolean> 表示不要重试这个。

您也可以以相反的方式配置它(映射值中的默认值 falsetrue),明确说明您想要重试哪些异常,而不要重试所有其他异常。

为所有异常调用 MessageRecoverer,或者立即为分类异常调用,或者当其他异常用尽重试时调用。