为什么 RabbitMQ 队列中的所有消息都未被确认?
Why are all messages in the RabbitMQ queue unacknowledged?
我有一个单节点 RabbitMQ 服务器设置,带有一个名为 generated_buckets
的队列。然后我有一个简单的 Python 消费者来消费消息,如下所示:
def scan_from_mq(server: str, port: int, queue_name: str) -> None:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=server, port=port))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
def callback(ch, method, properties, body):
print(body)
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='generated_buckets', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press Ctrl+C')
channel.start_consuming()
出于某种原因,第二个 运行 队列中的消费者 all 条消息变得未被确认。
消费者开始之前:
root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
generated_buckets 407 0
消费者启动后:
root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
generated_buckets 0 372
有没有我遗漏的简单的东西?
我发现这被称为 Consumer prefetch 并且可以在设置频道时对其进行限制。我用这条线修复了它:
channel.basic_qos(prefetch_count=200, global_qos=True)
现在消费者消费时只有 200 条未确认的消息。
我有一个单节点 RabbitMQ 服务器设置,带有一个名为 generated_buckets
的队列。然后我有一个简单的 Python 消费者来消费消息,如下所示:
def scan_from_mq(server: str, port: int, queue_name: str) -> None:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=server, port=port))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
def callback(ch, method, properties, body):
print(body)
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='generated_buckets', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press Ctrl+C')
channel.start_consuming()
出于某种原因,第二个 运行 队列中的消费者 all 条消息变得未被确认。
消费者开始之前:
root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
generated_buckets 407 0
消费者启动后:
root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
generated_buckets 0 372
有没有我遗漏的简单的东西?
我发现这被称为 Consumer prefetch 并且可以在设置频道时对其进行限制。我用这条线修复了它:
channel.basic_qos(prefetch_count=200, global_qos=True)
现在消费者消费时只有 200 条未确认的消息。