Rabbitmq headers 交换并确认交付
Rabbitmq headers exchange and confirmed delivery
我正在尝试在 RabbitMQ 上使用 headers 交换,混合使用 java 和 python 组件,我需要确认交付。
我的行为似乎与 python (pika) 和 java 客户不同。
在python中:
channel.exchange_declare(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers',
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ routing_key='',
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True,
¦ ¦ ¦ ¦ ¦ ¦ body=message,
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2,
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers))
如果 headers 不匹配任何绑定的消费者并且无法路由消息,结果为 false
但是在java/scala:
channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect
val props = MessageProperties.PERSISTENT_BASIC.builder
¦ ¦ ¦ ¦ .headers(messageHeaders).build
channel.basicPublish("headers_test",
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory
¦ ¦ ¦ ¦ ¦ ¦props,
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes)
channel.waitForConfirmsOrDie()
在这里,当 messageHeaders 找不到匹配项时,消息似乎只是被丢弃而没有错误。
我是不是遗漏了什么,或者两个客户的行为真的不同?我怎样才能在 java 中使用 headers 兑换确认送达?
注意:我已经有 "complex" 交换到 queues 路由设置,我宁愿避免将 dead-letters 路由添加到游戏中,而只是 fail-on-send。
即使没有 queue 与您的 headers 匹配,消息也被视为已确认的问题。来自文档 (https://www.rabbitmq.com/confirms.html):
For unroutable messages, the broker will issue a confirm once the
exchange verifies a message won't route to any queue (returns an empty
list of queues). If the message is also published as mandatory, the
basic.return is sent to the client before basic.ack. The same is true
for negative acknowledgements (basic.nack).
相反,您应该检查 basic.return 消息以检测消息是否已被路由。
我已经使用 wireshark 进行了检查,确实我可以看到如果消息未被路由,则存在 AMQP basic.return 消息。
我想你应该从
开始
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
事实上,如果消息没有被路由,我会得到这个:
replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs],
routingKey = [], pro....
此外,如果您想在 Java 中模拟 Pika 的同步行为,您似乎可以通过在发布消息之前获取当前发布序列号并注册确认侦听器而不是依赖 .waitForConfirmsOrDie( ).
所以完整的代码示例是:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleAck");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleNack");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
});
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
channel.basicPublish("headers_logs",
"",
true,
props,
"data".getBytes());
并且在 return/confirm 回调中,您需要查找在发布消息之前获得的频道发布序列号。
如果您查看线路上发生的情况,如果消息尚未路由到任何 queue,RabbitMq 会发回一条 basic.return 消息,其中还包含一条确认信息 (交货标签)。如果消息已被路由,RabbitMq 会发回一条 bacic.ack 消息,其中还包含一条确认消息。
RabbitMq Java 客户端似乎总是在 basicConfirm() 之前调用 basicReturn() 回调,因此判断消息是否已路由的逻辑可以是这样的:
注册 return 并确认频道上的听众;
记住频道的下一个发布序列号;
等待 return 或确认回调。如果是 return 回调 - 消息尚未路由,您应该忽略对同一投递标签的进一步确认。如果您在收到 handleReturn() 之前收到 handleAck() 回调,则表示消息已路由到 queue.
虽然我不确定在什么情况下可以调用.handleNack()。
我正在尝试在 RabbitMQ 上使用 headers 交换,混合使用 java 和 python 组件,我需要确认交付。
我的行为似乎与 python (pika) 和 java 客户不同。
在python中:
channel.exchange_declare(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers',
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ routing_key='',
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True,
¦ ¦ ¦ ¦ ¦ ¦ body=message,
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2,
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers))
如果 headers 不匹配任何绑定的消费者并且无法路由消息,结果为 false
但是在java/scala:
channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect
val props = MessageProperties.PERSISTENT_BASIC.builder
¦ ¦ ¦ ¦ .headers(messageHeaders).build
channel.basicPublish("headers_test",
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory
¦ ¦ ¦ ¦ ¦ ¦props,
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes)
channel.waitForConfirmsOrDie()
在这里,当 messageHeaders 找不到匹配项时,消息似乎只是被丢弃而没有错误。
我是不是遗漏了什么,或者两个客户的行为真的不同?我怎样才能在 java 中使用 headers 兑换确认送达?
注意:我已经有 "complex" 交换到 queues 路由设置,我宁愿避免将 dead-letters 路由添加到游戏中,而只是 fail-on-send。
即使没有 queue 与您的 headers 匹配,消息也被视为已确认的问题。来自文档 (https://www.rabbitmq.com/confirms.html):
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
相反,您应该检查 basic.return 消息以检测消息是否已被路由。
我已经使用 wireshark 进行了检查,确实我可以看到如果消息未被路由,则存在 AMQP basic.return 消息。
我想你应该从
开始channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
事实上,如果消息没有被路由,我会得到这个:
replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs], routingKey = [], pro....
此外,如果您想在 Java 中模拟 Pika 的同步行为,您似乎可以通过在发布消息之前获取当前发布序列号并注册确认侦听器而不是依赖 .waitForConfirmsOrDie( ).
所以完整的代码示例是:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleAck");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleNack");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
});
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
channel.basicPublish("headers_logs",
"",
true,
props,
"data".getBytes());
并且在 return/confirm 回调中,您需要查找在发布消息之前获得的频道发布序列号。
如果您查看线路上发生的情况,如果消息尚未路由到任何 queue,RabbitMq 会发回一条 basic.return 消息,其中还包含一条确认信息 (交货标签)。如果消息已被路由,RabbitMq 会发回一条 bacic.ack 消息,其中还包含一条确认消息。
RabbitMq Java 客户端似乎总是在 basicConfirm() 之前调用 basicReturn() 回调,因此判断消息是否已路由的逻辑可以是这样的:
注册 return 并确认频道上的听众; 记住频道的下一个发布序列号; 等待 return 或确认回调。如果是 return 回调 - 消息尚未路由,您应该忽略对同一投递标签的进一步确认。如果您在收到 handleReturn() 之前收到 handleAck() 回调,则表示消息已路由到 queue.
虽然我不确定在什么情况下可以调用.handleNack()。