Nack当前消息之前的所有消息和Ack当前消息(rabbitmq,java)
Nack all messages before current message and Ack current message (rabbitmq, java)
channel.basicQos(1);
while (true) {
GetResponse res = channel.basicGet(TEST_QUEUE, false);
if (res != null) {
deliveryTag = res.getEnvelope().getDeliveryTag();
}
// Handle all messages If the condition is true
if (condition) {
// nack all messages unhandled previously
channel.basicNack(deliveryTag - 1, true, true);
// ack current message only
channel.basicAck(deliveryTag, false);
}
else {
// Do not handle current message and continue to get next one
}
}
Q1。
我不确定我是否可以同时使用 nack 和 ack。
我可以使用 deliveryTag - 1 来指示所有以前的消息吗?
简而言之,我想跳过所有不满足if条件的消息。
如果当前消息满足条件,则 nack 所有跳过的消息并确认当前消息。
通过这样做,我想延迟处理某些特定消息。
Q2.
恐怕如果我写 while (true) 并且有多个工人 运行 那么 channel.basicQos(1) 将无法按预期工作。
我应该写这样的代码来限制计数吗?或者我应该怎么写才能保证所有其他worker都能平均收到消息?
int prefetch = 1;
int count = 0;
while (count++ <= prefetch) {
}
Q3.
我注意到只要连接打开,工作程序就不会终止。
连接将打开多长时间,我是否需要手动关闭它?
最后,
RabbitMQ java 客户端 API vs AmqpTemplate vs RabbitTemplate 在这种情况下哪个更合适(不使用 MessageListener(ChannelAwareMessageListener) 模型)?
Q1 - 它应该可以正常工作。有没有试过发现问题?是的,每次交付都会增加标签。
Q2 - basicQos
与 basicGet()
无关 - 它仅与 basicConsume()
一起使用。
Q3 - 完成后需要关闭连接。
终于;这取决于。如果你想要 Spring 的更高级别的支持(消息转换等),那么使用它;如果要处理原始数据 API,请不要使用 Spring。
RabbitTemplate
不直接支持 basicGet
用户管理 acks/nacks,除非通过其带有频道回调的 execute
方法。
channel.basicQos(1);
while (true) {
GetResponse res = channel.basicGet(TEST_QUEUE, false);
if (res != null) {
deliveryTag = res.getEnvelope().getDeliveryTag();
}
// Handle all messages If the condition is true
if (condition) {
// nack all messages unhandled previously
channel.basicNack(deliveryTag - 1, true, true);
// ack current message only
channel.basicAck(deliveryTag, false);
}
else {
// Do not handle current message and continue to get next one
}
}
Q1。
我不确定我是否可以同时使用 nack 和 ack。
我可以使用 deliveryTag - 1 来指示所有以前的消息吗?
简而言之,我想跳过所有不满足if条件的消息。
如果当前消息满足条件,则 nack 所有跳过的消息并确认当前消息。
通过这样做,我想延迟处理某些特定消息。
Q2.
恐怕如果我写 while (true) 并且有多个工人 运行 那么 channel.basicQos(1) 将无法按预期工作。
我应该写这样的代码来限制计数吗?或者我应该怎么写才能保证所有其他worker都能平均收到消息?
int prefetch = 1;
int count = 0;
while (count++ <= prefetch) {
}
Q3.
我注意到只要连接打开,工作程序就不会终止。
连接将打开多长时间,我是否需要手动关闭它?
最后, RabbitMQ java 客户端 API vs AmqpTemplate vs RabbitTemplate 在这种情况下哪个更合适(不使用 MessageListener(ChannelAwareMessageListener) 模型)?
Q1 - 它应该可以正常工作。有没有试过发现问题?是的,每次交付都会增加标签。
Q2 - basicQos
与 basicGet()
无关 - 它仅与 basicConsume()
一起使用。
Q3 - 完成后需要关闭连接。
终于;这取决于。如果你想要 Spring 的更高级别的支持(消息转换等),那么使用它;如果要处理原始数据 API,请不要使用 Spring。
RabbitTemplate
不直接支持 basicGet
用户管理 acks/nacks,除非通过其带有频道回调的 execute
方法。