为什么 RabbitMQ 队列中的所有消息都未被确认?

Why are all messages in the RabbitMQ queue unacknowledged?

我有一个单节点 RabbitMQ 服务器设置,带有一个名为 generated_buckets 的队列。然后我有一个简单的 Python 消费者来消费消息,如下所示:

def scan_from_mq(server: str, port: int, queue_name: str) -> None:
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=server, port=port))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)

    def callback(ch, method, properties, body):
        print(body)
        time.sleep(1)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(queue='generated_buckets', on_message_callback=callback, auto_ack=False)

    print(' [*] Waiting for messages. To exit press Ctrl+C')
    channel.start_consuming()

出于某种原因,第二个 运行 队列中的消费者 all 条消息变得未被确认。

消费者开始之前:

root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
generated_buckets       407     0

消费者启动后:

root@rmq0:/# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
generated_buckets       0       372

有没有我遗漏的简单的东西?

我发现这被称为 Consumer prefetch 并且可以在设置频道时对其进行限制。我用这条线修复了它:

channel.basic_qos(prefetch_count=200, global_qos=True)

现在消费者消费时只有 200 条未确认的消息。