使用反应式 spring-amqp 时业务异常的指数退避?

Exponential backoff for business exceptions when using reactive spring-amqp?

我正在使用 Spring AMQP 2.1.6 和 Spring Boot 2.1.5,我正在寻找配置 spring-amqp 以重试业务异常的推荐方法对于具有指数退避的无功组件 (Mono)。例如:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

我希望 spring-amqp 在 doSomething returns 出错时自动重试。通常可以在设置容器工厂时配置它来阻止 RabbitListener:

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));

其中 retryInterceptor 可能定义如下:

private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(backoffInitialInterval);
    backOffPolicy.setMultiplier(backoffMultiplier);
    backOffPolicy.setMaxInterval(backoffMaxInterval);

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
    retryTemplate.setBackOffPolicy(backOffPolicy);

    StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
    bean.setRetryOperations(retryTemplate);
    return bean.getObject();
}

但建议链似乎并未用于反应式 RabbitListener。这可能是因为,如果我理解正确的话,RetryTemplate/ExponentialBackOffPolicy 实际上会阻塞线程。

作为解决方法,我当然可以做类似的事情(切换到 Kotlin,因为它更容易一些):

@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
    return myService.doSomething(myMessage)
                    .retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
            log.info("Caught exception ${ctx.exception()}")
        }
}

但我希望将此重试逻辑应用于从 RabbitListener 返回的所有 Mono 实例。这样的事情是可能的还是你应该在使用项目反应器的反应序列和 spring-amqp 时以另一种方式配置它?

将重试逻辑应用到您的反应序列中确实更好,类似于您对 retryExponentialBackoff() 的处理方式。仅仅因为 Reactive Streams 的执行不在同一个线程上,我们可以将 Retrytemplate 应用于 myListener().

现在内部逻辑是这样的:

private static class MonoHandler {

    static boolean isMono(Object result) {
        return result instanceof Mono;
    }

    @SuppressWarnings("unchecked")
    static void subscribe(Object returnValue, Consumer<? super Object> success,
            Consumer<? super Throwable> failure) {

        ((Mono<? super Object>) returnValue).subscribe(success, failure);
    }

}

Consumer<? super Throwable> failure 这样做:

private void asyncFailure(Message request, Channel channel, Throwable t) {
    this.logger.error("Future or Mono was completed with an exception for " + request, t);
    try {
        channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
    }
    catch (IOException e) {
        this.logger.error("Failed to nack message", e);
    }
} 

所以,我们根本没有任何方法来启动 RetryTemplate,但同时通过显式 basicNack() 我们可以自然地重试 re-fetching 从 RabbitMQ 返回相同的消息。

我们可能会在内部为 Mono 应用 Reactor 重试,但看起来 RetryOperationsInterceptor 不能简单地转换为 Mono.retry()

所以,换句话说,RetryOperationsInterceptor 是反应式处理的错误方式。在您自己的代码中明确使用 Mono.retry()

只要您对 @RabbitListener 方法有反应性 return,您就可以公开一些常用实用方法并将其作为 Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer) 应用。