记录 Spring AMQP 的消息侦听器抛出的异常

Logging exceptions thrown by message listeners for Spring AMQP

我有一个使用异步使用者(通过@RabbitListener)处理消息的应用程序。在这个消费者方法中,发生异常,并且由于我定义的策略,消息被重新排队:

spring:
    rabbitmq:
        listener:
            simple:
                default-requeue-rejected: false
                retry:
                    enabled: true
                    max-attempts: 10
                    initial-interval: 60000 # a minute
                    multiplier: 2
                    max-interval: 600000 # 10 minutes

消费者方法调用一个私有方法,该方法递归地从数据库中获取数据并使用 RabbitTemplate 推入队列。我预计此队列中大约有 200 条消息,但它上升到大约 700k,然后由于重试策略耗尽,消费者线程停止。

问题是我找不到任何地方记录异常,因此我无法理解是业务逻辑的哪一部分导致了这个问题。我可能会尝试将整个函数放入 try/catch 块中,并在将其重新抛出用于 Spring AMQP 的异常处理之前记录问题,但我想知道是否存在更好的方法。

我的项目有以下依赖项:

Spring 开机: 1.5.9.RELEASE
Spring AMQP: 1.7.4.RELEASE
RabbitMQ:3.7.2

我们可能应该尝试使添加 RetryListener 变得更容易一些,但您现在可以通过如下方式替换重试拦截器来做到这一点...

@SpringBootApplication
public class So48331502Application {

    private static final Logger logger = LoggerFactory.getLogger(So48331502Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So48331502Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitListenerEndpointRegistry registry,
            RabbitProperties properties, Advice interceptor) {
        return args -> {
            ListenerRetry retry = properties.getListener().getSimple().getRetry();
            if (retry.isEnabled()) {
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) registry
                        .getListenerContainer("myListener");
                container.setAdviceChain(interceptor);
            }
            registry.start();
        };
    }

    @Bean
    public StatelessRetryOperationsInterceptorFactoryBean interceptor(RabbitProperties properties) {
        ListenerRetry retry = properties.getListener().getSimple().getRetry();
        RetryTemplate retryTemplate = new RetryTemplate();
        RetryPolicy retryPolicy = new SimpleRetryPolicy(retry.getMaxAttempts());
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(retry.getInitialInterval());
        backOffPolicy.setMultiplier(retry.getMultiplier());
        backOffPolicy.setMaxInterval(retry.getMaxInterval());
        retryTemplate.setRetryPolicy(retryPolicy);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.registerListener(
            new RetryListener() {

                @Override
                public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                    return true;
                }

                @Override
                public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
                    if (throwable != null) {
                        logger.info("Failed: Retry count " + context.getRetryCount(), throwable);
                    }
                }

                @Override
                public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
                    logger.info("Retry count " + context.getRetryCount(), throwable);
                }
            });
        StatelessRetryOperationsInterceptorFactoryBean interceptor =
                new StatelessRetryOperationsInterceptorFactoryBean();
        interceptor.setRetryOperations(retryTemplate);
        return interceptor;
    }

    @RabbitListener(id="myListener", queues = "one")
    public void in(Object in) {
        throw new RuntimeException();
    }

}

请注意,您必须将 auto-startup 设置为 false,以便您可以更改建议链...

spring:
  rabbitmq:
    listener:
      simple:
        auto-startup: 'false'
        retry:
          enabled: 'true'

然后启动注册表,这将启动所有容器。