使用反应式 spring-amqp 时业务异常的指数退避?
Exponential backoff for business exceptions when using reactive spring-amqp?
我正在使用 Spring AMQP 2.1.6 和 Spring Boot 2.1.5,我正在寻找配置 spring-amqp 以重试业务异常的推荐方法对于具有指数退避的无功组件 (Mono
)。例如:
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
Mono<Void> mono = myService.doSomething(myMessage);
return mono;
}
我希望 spring-amqp 在 doSomething
returns 出错时自动重试。通常可以在设置容器工厂时配置它来阻止 RabbitListener:
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));
其中 retryInterceptor
可能定义如下:
private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMultiplier(backoffMultiplier);
backOffPolicy.setMaxInterval(backoffMaxInterval);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
retryTemplate.setBackOffPolicy(backOffPolicy);
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
return bean.getObject();
}
但建议链似乎并未用于反应式 RabbitListener。这可能是因为,如果我理解正确的话,RetryTemplate
/ExponentialBackOffPolicy
实际上会阻塞线程。
作为解决方法,我当然可以做类似的事情(切换到 Kotlin,因为它更容易一些):
@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
return myService.doSomething(myMessage)
.retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
log.info("Caught exception ${ctx.exception()}")
}
}
但我希望将此重试逻辑应用于从 RabbitListener
返回的所有 Mono
实例。这样的事情是可能的还是你应该在使用项目反应器的反应序列和 spring-amqp 时以另一种方式配置它?
将重试逻辑应用到您的反应序列中确实更好,类似于您对 retryExponentialBackoff()
的处理方式。仅仅因为 Reactive Streams 的执行不在同一个线程上,我们可以将 Retrytemplate
应用于 myListener()
.
现在内部逻辑是这样的:
private static class MonoHandler {
static boolean isMono(Object result) {
return result instanceof Mono;
}
@SuppressWarnings("unchecked")
static void subscribe(Object returnValue, Consumer<? super Object> success,
Consumer<? super Throwable> failure) {
((Mono<? super Object>) returnValue).subscribe(success, failure);
}
}
Consumer<? super Throwable> failure
这样做:
private void asyncFailure(Message request, Channel channel, Throwable t) {
this.logger.error("Future or Mono was completed with an exception for " + request, t);
try {
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
}
catch (IOException e) {
this.logger.error("Failed to nack message", e);
}
}
所以,我们根本没有任何方法来启动 RetryTemplate
,但同时通过显式 basicNack()
我们可以自然地重试 re-fetching 从 RabbitMQ 返回相同的消息。
我们可能会在内部为 Mono
应用 Reactor 重试,但看起来 RetryOperationsInterceptor
不能简单地转换为 Mono.retry()
。
所以,换句话说,RetryOperationsInterceptor
是反应式处理的错误方式。在您自己的代码中明确使用 Mono.retry()
。
只要您对 @RabbitListener
方法有反应性 return,您就可以公开一些常用实用方法并将其作为 Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer)
应用。
我正在使用 Spring AMQP 2.1.6 和 Spring Boot 2.1.5,我正在寻找配置 spring-amqp 以重试业务异常的推荐方法对于具有指数退避的无功组件 (Mono
)。例如:
@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
Mono<Void> mono = myService.doSomething(myMessage);
return mono;
}
我希望 spring-amqp 在 doSomething
returns 出错时自动重试。通常可以在设置容器工厂时配置它来阻止 RabbitListener:
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setAdviceChain(retryInterceptor(..));
其中 retryInterceptor
可能定义如下:
private static RetryOperationsInterceptor retryInterceptor(long backoffInitialInterval, double backoffMultiplier, long backoffMaxInterval, int maxAttempts) {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMultiplier(backoffMultiplier);
backOffPolicy.setMaxInterval(backoffMaxInterval);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(maxAttempts)));
retryTemplate.setBackOffPolicy(backOffPolicy);
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
return bean.getObject();
}
但建议链似乎并未用于反应式 RabbitListener。这可能是因为,如果我理解正确的话,RetryTemplate
/ExponentialBackOffPolicy
实际上会阻塞线程。
作为解决方法,我当然可以做类似的事情(切换到 Kotlin,因为它更容易一些):
@RabbitListener
fun myListener(MyMessage myMessage) : Mono<Void> {
return myService.doSomething(myMessage)
.retryExponentialBackoff(10, Duration.ofMillis(100), Duration.ofSeconds(5)) { ctx ->
log.info("Caught exception ${ctx.exception()}")
}
}
但我希望将此重试逻辑应用于从 RabbitListener
返回的所有 Mono
实例。这样的事情是可能的还是你应该在使用项目反应器的反应序列和 spring-amqp 时以另一种方式配置它?
将重试逻辑应用到您的反应序列中确实更好,类似于您对 retryExponentialBackoff()
的处理方式。仅仅因为 Reactive Streams 的执行不在同一个线程上,我们可以将 Retrytemplate
应用于 myListener()
.
现在内部逻辑是这样的:
private static class MonoHandler {
static boolean isMono(Object result) {
return result instanceof Mono;
}
@SuppressWarnings("unchecked")
static void subscribe(Object returnValue, Consumer<? super Object> success,
Consumer<? super Throwable> failure) {
((Mono<? super Object>) returnValue).subscribe(success, failure);
}
}
Consumer<? super Throwable> failure
这样做:
private void asyncFailure(Message request, Channel channel, Throwable t) {
this.logger.error("Future or Mono was completed with an exception for " + request, t);
try {
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
}
catch (IOException e) {
this.logger.error("Failed to nack message", e);
}
}
所以,我们根本没有任何方法来启动 RetryTemplate
,但同时通过显式 basicNack()
我们可以自然地重试 re-fetching 从 RabbitMQ 返回相同的消息。
我们可能会在内部为 Mono
应用 Reactor 重试,但看起来 RetryOperationsInterceptor
不能简单地转换为 Mono.retry()
。
所以,换句话说,RetryOperationsInterceptor
是反应式处理的错误方式。在您自己的代码中明确使用 Mono.retry()
。
只要您对 @RabbitListener
方法有反应性 return,您就可以公开一些常用实用方法并将其作为 Mono.transform(Function<? super Mono<T>, ? extends Publisher<V>> transformer)
应用。