如何 Ack/Nack 在 Spring AMQP 中使用反应式 RabbitListener?

How to Ack/Nack with reactive RabbitListener in Spring AMQP?

我正在使用 Spring AMQP 2.1.6 来使用 RabbitListener 返回 Mono<Void> 的消息。例如:

@RabbitListener
public Mono<Void> myListener(MyMessage myMessage) {
    Mono<Void> mono = myService.doSomething(myMessage);
    return mono;
}

阅读 documentation 它说:

The listener container factory must be configured with AcknowledgeMode.MANUAL so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.

因此,我已经用 AcknowledgeMode.MANUAL 配置了容器工厂,但我不清楚 "the asynchronous completion will ack or nack the message when the async operation completes" 是否意味着这是由 spring-amqp 本身处理的,或者这是不是我要做? IE。在调用 myService.doSomething(myMessage) 之后我是否必须 ack/nack 消息或者 Spring AMQP 自动确认它因为我正在返回 Mono (即使 AcknowledgeMode.MANUAL 设置)?

如果我需要手动发送 ack 或 reject,那么在使用 RabbitListener 时以非阻塞方式执行此操作的惯用方法是什么?

当 Mono 完成时,侦听器适配器负责处理确认。

参见 AbstractAdaptableMessageListener.asyncSuccess()asyncFailure()

编辑

我不是反应堆人,但据我所知,完成 Mono<Void> 没有任何作用,因此永远不会调用 on...() 方法。

您可以使用 channel.basicAckbasicReject...

手动确认递送
@RabbitListener(queues = "foo")
public void listen(String in, Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    ...
}