在反应式 Spring-Webflux 应用程序中使用 Spring AMQP @RabbitListener 时如何触发重试
How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app
我有一个 spring-webflux 应用程序,它必须使用来自 rabbitMQ 的消息。在以前的应用程序中,当不使用 spring-webflux 时,我能够:
- 声明队列时配置重试策略
- 使用@RabbitListener 注释设置兔子侦听器
- 通过在处理函数中抛出异常来触发重试
在 spring-webflux 中我无法抛出错误,我只有一个 MonoError,如何触发重试?
我的代码目前看起来像这样
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
编辑
我现在发现这是可能的。如果客户端代码例如 returns a Mono<Exception>
那么这将触发重试。同样,我可以有条件地触发重试映射到 Mono<Exception>
。例如,如果我想在消息中的产品不存在时触发重试,我可以执行以下操作
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist
将反应器与非反应性侦听器容器一起使用有很多挑战。
- 您必须使用手动确认和ack/nack反应流完成后的交付。
- 您必须使用 reactor 的重试机制。
考虑查看 https://github.com/reactor/reactor-rabbitmq 项目而不是 Spring AMQP。在未来的某个时候,我们希望构建响应式 @RabbitListener
,但目前还没有。
我有一个 spring-webflux 应用程序,它必须使用来自 rabbitMQ 的消息。在以前的应用程序中,当不使用 spring-webflux 时,我能够:
- 声明队列时配置重试策略
- 使用@RabbitListener 注释设置兔子侦听器
- 通过在处理函数中抛出异常来触发重试
在 spring-webflux 中我无法抛出错误,我只有一个 MonoError,如何触发重试?
我的代码目前看起来像这样
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
编辑
我现在发现这是可能的。如果客户端代码例如 returns a Mono<Exception>
那么这将触发重试。同样,我可以有条件地触发重试映射到 Mono<Exception>
。例如,如果我想在消息中的产品不存在时触发重试,我可以执行以下操作
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist
将反应器与非反应性侦听器容器一起使用有很多挑战。
- 您必须使用手动确认和ack/nack反应流完成后的交付。
- 您必须使用 reactor 的重试机制。
考虑查看 https://github.com/reactor/reactor-rabbitmq 项目而不是 Spring AMQP。在未来的某个时候,我们希望构建响应式 @RabbitListener
,但目前还没有。