Nack当前消息之前的所有消息和Ack当前消息(rabbitmq,java)

Nack all messages before current message and Ack current message (rabbitmq, java)

channel.basicQos(1);

while (true) {
    GetResponse res = channel.basicGet(TEST_QUEUE, false);
    if (res != null) {
        deliveryTag = res.getEnvelope().getDeliveryTag();
    }

    // Handle all messages If the condition is true
    if (condition) {
        // nack all messages unhandled previously
        channel.basicNack(deliveryTag - 1, true, true);

        // ack current message only
        channel.basicAck(deliveryTag, false);
    }
    else {
        // Do not handle current message and continue to get next one
    }           
}   

Q1。
我不确定我是否可以同时使用 nack 和 ack。
我可以使用 deliveryTag - 1 来指示所有以前的消息吗?

简而言之,我想跳过所有不满足if条件的消息。
如果当前消息满足条件,则 nack 所有跳过的消息并确认当前消息。
通过这样做,我想延迟处理某些特定消息。

Q2.
恐怕如果我写 while (true) 并且有多个工人 运行 那么 channel.basicQos(1) 将无法按预期工作。
我应该写这样的代码来限制计数吗?或者我应该怎么写才能保证所有其他worker都能平均收到消息?

int prefetch = 1;
int count = 0;
while (count++ <= prefetch) {
}

Q3.
我注意到只要连接打开,工作程序就不会终止。
连接将打开多长时间,我是否需要手动关闭它?

最后, RabbitMQ java 客户端 API vs AmqpTemplate vs RabbitTemplate 在这种情况下哪个更合适(不使用 MessageListener(ChannelAwareMessageListener) 模型)?

Q1 - 它应该可以正常工作。有没有试过发现问题?是的,每次交付都会增加标签。

Q2 - basicQosbasicGet() 无关 - 它仅与 basicConsume() 一起使用。

Q3 - 完成后需要关闭连接。

终于;这取决于。如果你想要 Spring 的更高级别的支持(消息转换等),那么使用它;如果要处理原始数据 API,请不要使用 Spring。

RabbitTemplate 不直接支持 basicGet 用户管理 acks/nacks,除非通过其带有频道回调的 execute 方法。