在反应式 Spring-Webflux 应用程序中使用 Spring AMQP @RabbitListener 时如何触发重试

How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app

我有一个 spring-webflux 应用程序,它必须使用来自 rabbitMQ 的消息。在以前的应用程序中,当不使用 spring-webflux 时,我能够:

  1. 声明队列时配置重试策略
  2. 使用@RabbitListener 注释设置兔子侦听器
  3. 通过在处理函数中抛出异常来触发重试

在 spring-webflux 中我无法抛出错误,我只有一个 MonoError,如何触发重试?

我的代码目前看起来像这样

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // want to retry if downstream app is down
      .subscribe();
  }
}

编辑

我现在发现这是可能的。如果客户端代码例如 returns a Mono<Exception> 那么这将触发重试。同样,我可以有条件地触发重试映射到 Mono<Exception>。例如,如果我想在消息中的产品不存在时触发重试,我可以执行以下操作

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist

将反应器与非反应性侦听器容器一起使用有很多挑战。

  1. 您必须使用手动确认和ack/nack反应流完成后的交付。
  2. 您必须使用 reactor 的重试机制。

考虑查看 https://github.com/reactor/reactor-rabbitmq 项目而不是 Spring AMQP。在未来的某个时候,我们希望构建响应式 @RabbitListener,但目前还没有。