当消费者关闭并启动时,RabbitMQ 预取被忽略
RabbitMQ prefetch ignored when consumer is Down and gets Up
我的 basicQos 在消费者宕机时被忽略了,之后消费者又起来了。例如,假设消费者宕机了,生产者收到了 5 条消息。如果消费者不是 运行,如果 exchanger/queue 是持久的,这些消息将存储在磁盘中(我认为!)。
如果我将 basicQos 设置为 channel.basicQos(0, 3, true),我的消费者在启动时会收到超过 3 条消息。为什么?!?
另一方面,如果消费者在接收来自队列的消息时 运行,则一切正常(仅从队列中读取了 3 条消息)...我的代码如下:
factory = new ConnectionFactory();
factory.setHost(mRabbitMQHost); //may get server address from file configuration.
factory.setUsername(mRabbitMQUsername);
factory.setPassword(mRabbitMQPassword);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);
for (QGQueues queue : QGQueues.values()) {
String queueName = queue.getQueueName();
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server
}
channel.basicQos(0, 3, true);
谢谢!
我敢打赌,您需要在执行任何其他操作之前设置 QoS。
将您的代码更改为此顺序:
channel = connection.createChannel();
// set QoS immediately
channel.basicQos(0, 3, true);
channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);
for (QGQueues queue : QGQueues.values()) {
String queueName = queue.getQueueName();
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server
}
这将确保在您尝试使用任何消息之前设置预取限制。
我的 basicQos 在消费者宕机时被忽略了,之后消费者又起来了。例如,假设消费者宕机了,生产者收到了 5 条消息。如果消费者不是 运行,如果 exchanger/queue 是持久的,这些消息将存储在磁盘中(我认为!)。
如果我将 basicQos 设置为 channel.basicQos(0, 3, true),我的消费者在启动时会收到超过 3 条消息。为什么?!?
另一方面,如果消费者在接收来自队列的消息时 运行,则一切正常(仅从队列中读取了 3 条消息)...我的代码如下:
factory = new ConnectionFactory();
factory.setHost(mRabbitMQHost); //may get server address from file configuration.
factory.setUsername(mRabbitMQUsername);
factory.setPassword(mRabbitMQPassword);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);
for (QGQueues queue : QGQueues.values()) {
String queueName = queue.getQueueName();
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server
}
channel.basicQos(0, 3, true);
谢谢!
我敢打赌,您需要在执行任何其他操作之前设置 QoS。
将您的代码更改为此顺序:
channel = connection.createChannel();
// set QoS immediately
channel.basicQos(0, 3, true);
channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);
for (QGQueues queue : QGQueues.values()) {
String queueName = queue.getQueueName();
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server
}
这将确保在您尝试使用任何消息之前设置预取限制。