RabbitMQ + Reactor,传递消息后发送ACK?
RabbitMQ + Reactor, sending ACK after passing on message?
我想弄清楚 Project-Reactor 是否适合我的用例。
我的申请需要高度一致性,我想:
- 收到来自 RabbitMQ 的消息
- 做一些manipulation/transformation
- 将消息放在(不同的)RabbitMQ 队列中
- 发送原始消息的 ACK
使用普通的 RabbitMQ Java 库,它大致看起来像这样:
var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.newConnection();
var channel = connection.createChannel();
channel.txSelect();
channel.queueDeclare("queue.A", true, false, false, null);
channel.queueDeclare("queue.B", true, false, false, null);
channel.basicConsume("queue.A", false, (tag, delivery) ->
{
String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received: " + data);
channel.basicPublish("", "queue.B", null, data.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
System.out.println("Sending ACK");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.txCommit();
}, tag -> System.err.println("Tag " + tag + " failed"));
打印效果很好
Received: DATA
Sending ACK
使用 Reactor-RabbitMQ,我得出了以下结论:
var options = new ReceiverOptions();
var receiver = RabbitFlux.createReceiver(options);
var sender = RabbitFlux.createSender();
sender.declare(QueueSpecification.queue("queue.A")).block();
sender.declare(QueueSpecification.queue("queue.B")).block();
sender
.send(receiver.consumeManualAck("queue.A")
.doOnError(ex -> ex.printStackTrace())
.doOnEach(s ->
{
s.get().ack();
print("ACK");
})
.map(ad ->
{
print("MAP");
return new OutboundMessage("", "queue.B", ad.getBody());
}))
.doOnEach(v ->
{
print("SUB");
})
.subscribe();
然而,这打印
ACK
MAP
这显然不是正确的顺序,因为我在 map
之前执行 doOnEach
。但是 AcknowledgableDelivery
在我为发件人将其映射到 OutboundMessage
后不再可用。
请注意,SUB
永远不会打印出来。可能是因为 send
订阅是连续的,永远不会达到 Mono<Void>
终止状态。
我对 Reactor 的了解并不广泛,所以我觉得我缺少一些我可以在这里使用的东西。是否可以使用 Reactor 巧妙地完成此操作,还是我应该忘记它?
脚注:我使用 var
是因为我在 Java 11 中,打印语句作为量化顺序的一种方式执行了哪些事情。
我认为不可能在发送方法中进行确认。相反,您可以尝试:
receiver.consumeManualAck("queue.A")
.flatMap(ad -> {
Mono<OutboundMessage> outboundMessageMono =
Mono.just(new OutboundMessage("", "queue.B", ad.getBody()));
return sender.send(outboundMessageMono)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
ad.ack();
} else {
ad.nack(false);
}
});
});
我想弄清楚 Project-Reactor 是否适合我的用例。
我的申请需要高度一致性,我想:
- 收到来自 RabbitMQ 的消息
- 做一些manipulation/transformation
- 将消息放在(不同的)RabbitMQ 队列中
- 发送原始消息的 ACK
使用普通的 RabbitMQ Java 库,它大致看起来像这样:
var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.newConnection();
var channel = connection.createChannel();
channel.txSelect();
channel.queueDeclare("queue.A", true, false, false, null);
channel.queueDeclare("queue.B", true, false, false, null);
channel.basicConsume("queue.A", false, (tag, delivery) ->
{
String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received: " + data);
channel.basicPublish("", "queue.B", null, data.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
System.out.println("Sending ACK");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.txCommit();
}, tag -> System.err.println("Tag " + tag + " failed"));
打印效果很好
Received: DATA
Sending ACK
使用 Reactor-RabbitMQ,我得出了以下结论:
var options = new ReceiverOptions();
var receiver = RabbitFlux.createReceiver(options);
var sender = RabbitFlux.createSender();
sender.declare(QueueSpecification.queue("queue.A")).block();
sender.declare(QueueSpecification.queue("queue.B")).block();
sender
.send(receiver.consumeManualAck("queue.A")
.doOnError(ex -> ex.printStackTrace())
.doOnEach(s ->
{
s.get().ack();
print("ACK");
})
.map(ad ->
{
print("MAP");
return new OutboundMessage("", "queue.B", ad.getBody());
}))
.doOnEach(v ->
{
print("SUB");
})
.subscribe();
然而,这打印
ACK
MAP
这显然不是正确的顺序,因为我在 map
之前执行 doOnEach
。但是 AcknowledgableDelivery
在我为发件人将其映射到 OutboundMessage
后不再可用。
请注意,SUB
永远不会打印出来。可能是因为 send
订阅是连续的,永远不会达到 Mono<Void>
终止状态。
我对 Reactor 的了解并不广泛,所以我觉得我缺少一些我可以在这里使用的东西。是否可以使用 Reactor 巧妙地完成此操作,还是我应该忘记它?
脚注:我使用 var
是因为我在 Java 11 中,打印语句作为量化顺序的一种方式执行了哪些事情。
我认为不可能在发送方法中进行确认。相反,您可以尝试:
receiver.consumeManualAck("queue.A")
.flatMap(ad -> {
Mono<OutboundMessage> outboundMessageMono =
Mono.just(new OutboundMessage("", "queue.B", ad.getBody()));
return sender.send(outboundMessageMono)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
ad.ack();
} else {
ad.nack(false);
}
});
});