RabbitMQ:多消息和单一消费者

RabbitMQ: multiple messages and single consumer

我对 RabbitMQ 消费者有疑问。实际上我有一个消费者从三个队列中获取消息。问题是我需要从每个消息中获取多条消息,但我的消费者每个队列只获取一条消息并结束获取。如果有人能帮我解决这个问题,我将不胜感激。

消费代码如下

        for (int i = 0; i < queueNames.size(); i++) {

        Channel channel = connection.createChannel();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);

        flag = true;
        while (flag) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(routingKey);
            String message = new String(delivery.getBody(), "UTF-8");

                flag = false;
        }
    }

其中 queueNames 是包含我的队列名称的列表(数量为 3)。

您需要订阅队列,一个消费者只会按照您定义的方式消费 1 条消息

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
 new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
         String routingKey = envelope.getRoutingKey();
         String contentType = properties.getContentType();
         long deliveryTag = envelope.getDeliveryTag();
         // (process the message components here ...)
         channel.basicAck(deliveryTag, false);
     }
 });

更多信息在这里:https://www.rabbitmq.com/api-guide.html

好的,我这样解决问题:

boolean flag;
    System.out.println("Rozmiar queue " + queueNames.size());
    for (int i = 0; i < queueNames.size(); i++) {

        Channel channel = connection.createChannel();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);

        flag = true;
        while (flag) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout);
            if (delivery == null) {
                flag = false;
            } else {

                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Message Received '" + message + "'");
            }
        }
    }

我希望这个解决方案可以帮助将来的人:)