为什么我的 channel.basicConsume 不等待消息

How comes my channel.basicConsume does not wait for messages

每当我启动以下代码时:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "direct_logs";
    channel.exchangeDeclare(exchangeName, "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, "red");
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException{
            String message = new String(body, "UTF-8");
            System.out.println(message);
            System.out.println("message received");
        }
    };

    channel.basicConsume(queueName, true, consumer);

它不会像文档中暗示的那样开始无限循环。相反,它会立即停止。 我可以让它消耗一段时间的唯一方法是用循环替换 channel.basicConsume ,如下所示:

    DateTime startedAt = new DateTime();
    DateTime stopAt = startedAt.plusSeconds(60);
    long i=0;
    try {
        while (stopAt.compareTo(new DateTime()) > 0) {
            channel.basicConsume(queueName, true, consumer);
            i++;
        }
    }finally {
        System.out.println(new DateTime());
        System.out.println(startedAt);
        System.out.println(stopAt);
        System.out.println(i);
    }

一定有更好的方法来听一会儿消息,对吗?我错过了什么? 它会立即停止收听。

你确定它停止了吗? basicConsume 所做的是注册一个消费者来监听特定的队列,因此不需要在循环中执行它。你只执行一次,你传递的 Consumer 实例的 handleDelivery 方法将在消息到达时被调用。

rabbitmq 库创建的线程应防止 JVM 退出。为了退出程序,您实际上应该调用 connection.close()

这是来自 rabbitmq 的完整接收器示例:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

其实和你的差不多。

我遇到了同样的问题。原因是我最后打电话给 connection.close。但是,basicConsume() 方法不会阻塞在当前线程上,而是阻塞在其他线程上,因此它后面的代码,即 connection.close() 会立即被调用。