RabbitMQ channel.addConfirmListener() ,接口 ackCallback 缺少一些回调?

RabbitMQ channel.addConfirmListener() , interface ackCallback Some callbacks are missing?

这是我的代码,channel.addConfirmListener() ackCallback 会丢失一些回调,消息确实发送到rabbitmq服务器可以正常消费了,但是我发送消息后休眠了2ms,并且所有ack回调都能收到,

我不知道这是我的代码错误还是 rabbitmq 错误

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Log4j2
public class 异步确认发布{
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(7005);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 开启确认发布
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
    
        channel.queueDeclare("hello", true, false, false, null);
        //  异步确认发布消息 回调
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, send successful", deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
                }
        );
        for (int i = 0; i < 5; i++) {
            String message = "Hello World!!!   " + i;
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

控制台显示缺少一些回调

17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

但是我发送消息后休眠了2ms,所有回调都能收到

示例代码

for (int i = 0; i < 5; i++) {
    String message = "Hello World!!!   " + i;
    channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
    Thread.sleep(2);  // I sleep for 2ms after sending the message, and all ack callbacks can be received
}

控制台日志

17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

我的RabbitMQ Server版本是3.9.14(没有修改配置,使用默认配置),Erlang 24.3.2,

Maven项目依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.18.RELEASE</version>
</dependency>

我试图阻止主线程关闭,但这似乎不是主线程关闭的原因,因为主线程不会在创建连接后自动关闭

我不确定你为什么用 标记它,因为你根本没有使用 spring-rabbit API;您正在直接使用 amqp-client。

这是按设计工作的;出于性能原因,确认回调在 true 时具有附加参数 multiple;这意味着所有标签(包括这个标签)都通过一次确认得到确认。

https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

multiple: this is a boolean value. If false, only one message is confirmed/nack-ed, if true, all messages with a lower or equal sequence number are confirmed/nack-ed.