RabbitMQ:多消息和单一消费者
RabbitMQ: multiple messages and single consumer
我对 RabbitMQ 消费者有疑问。实际上我有一个消费者从三个队列中获取消息。问题是我需要从每个消息中获取多条消息,但我的消费者每个队列只获取一条消息并结束获取。如果有人能帮我解决这个问题,我将不胜感激。
消费代码如下
for (int i = 0; i < queueNames.size(); i++) {
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);
flag = true;
while (flag) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(routingKey);
String message = new String(delivery.getBody(), "UTF-8");
flag = false;
}
}
其中 queueNames 是包含我的队列名称的列表(数量为 3)。
您需要订阅队列,一个消费者只会按照您定义的方式消费 1 条消息
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
好的,我这样解决问题:
boolean flag;
System.out.println("Rozmiar queue " + queueNames.size());
for (int i = 0; i < queueNames.size(); i++) {
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);
flag = true;
while (flag) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout);
if (delivery == null) {
flag = false;
} else {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Message Received '" + message + "'");
}
}
}
我希望这个解决方案可以帮助将来的人:)
我对 RabbitMQ 消费者有疑问。实际上我有一个消费者从三个队列中获取消息。问题是我需要从每个消息中获取多条消息,但我的消费者每个队列只获取一条消息并结束获取。如果有人能帮我解决这个问题,我将不胜感激。
消费代码如下
for (int i = 0; i < queueNames.size(); i++) {
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);
flag = true;
while (flag) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(routingKey);
String message = new String(delivery.getBody(), "UTF-8");
flag = false;
}
}
其中 queueNames 是包含我的队列名称的列表(数量为 3)。
您需要订阅队列,一个消费者只会按照您定义的方式消费 1 条消息
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
好的,我这样解决问题:
boolean flag;
System.out.println("Rozmiar queue " + queueNames.size());
for (int i = 0; i < queueNames.size(); i++) {
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);
flag = true;
while (flag) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout);
if (delivery == null) {
flag = false;
} else {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Message Received '" + message + "'");
}
}
}
我希望这个解决方案可以帮助将来的人:)