RabbitMQ 中的重新排队顺序是什么?
What is the requeue order in RabbitMQ?
根据 Consumer Acknowledgements 上的 RabbitMQ 文档:
When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.
所以对于单个客户端消费者,如果服务器队列最初是
tail [c b a] head
并且客户端消费者消费了头部消息(“a”),服务器队列应该变成:
tail [c b] head
然后,如果客户端消费者 nacks 已处理的消息,则该消息应该在头部的服务器队列中重新排队(根据文档,它的“原始位置”)并且服务器队列应该变成:
tail [c b a] head
最终客户端消费者应该再次消费相同的头部消息(“a”)。
但这不是我使用 Python 库 Pika 观察到的结果。我观察到 nacked 消息在服务器队列的尾部重新排队,而不是在头部(“原始位置”)。 RabbitMQ 文档是否正确或库 Pika 是否正确?
示例代码:
import logging
import pika
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()
# Produce messages
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
routing_key = queue
channel = connection.channel()
channel.queue_declare(queue=queue)
for body in ["a", "b", "c"]:
channel.publish(exchange="", routing_key=routing_key, body=body)
logging.info(
"Produced message %r with routing key %r", body, routing_key
)
# Consume messages
def handle(channel, method, properties, body):
logging.info("Consumed message %r from queue %r", body.decode(), queue)
channel.basic_nack(method.delivery_tag)
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_consume(queue=queue, on_message_callback=handle)
channel.start_consuming()
输出:
INFO:root:Produced message 'a' with routing key 'foobar'
INFO:root:Produced message 'b' with routing key 'foobar'
INFO:root:Produced message 'c' with routing key 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
您遇到的行为很可能是预取行为造成的。
由于您没有指定所需的服务质量,我相信(希望有更多知识渊博的消息来源来确认这一点?)预取是由服务器决定的,并且可能会非常高。
想法是为了性能问题,一个客户端可以获得多条消息,这在大多数情况下是有利的:
- 如果消费者端有多线程,他可能可以并行处理多条消息,因此在任何给定时间会有多条消息尚未确认
- 为了在 "happy" 情况下允许更流畅的处理,客户端可以确认消息块,让服务器知道直到给定消息,消费者收到的所有消息都被确认,它减少了开销当我们有大量消息需要很少处理的情况时
如果您查看下面的文档链接,它们会解释您如何控制该行为。
有关这些要点的更多信息,请访问:
谢谢@Olivier。使用 channel.basic_qos(prefetch_count=1)
我得到记录的行为:
INFO:root:Produced message 'a' with routing key 'foobar'
INFO:root:Produced message 'b' with routing key 'foobar'
INFO:root:Produced message 'c' with routing key 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
根据 Consumer Acknowledgements 上的 RabbitMQ 文档:
When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.
所以对于单个客户端消费者,如果服务器队列最初是
tail [c b a] head
并且客户端消费者消费了头部消息(“a”),服务器队列应该变成:
tail [c b] head
然后,如果客户端消费者 nacks 已处理的消息,则该消息应该在头部的服务器队列中重新排队(根据文档,它的“原始位置”)并且服务器队列应该变成:
tail [c b a] head
最终客户端消费者应该再次消费相同的头部消息(“a”)。
但这不是我使用 Python 库 Pika 观察到的结果。我观察到 nacked 消息在服务器队列的尾部重新排队,而不是在头部(“原始位置”)。 RabbitMQ 文档是否正确或库 Pika 是否正确?
示例代码:
import logging
import pika
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()
# Produce messages
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
routing_key = queue
channel = connection.channel()
channel.queue_declare(queue=queue)
for body in ["a", "b", "c"]:
channel.publish(exchange="", routing_key=routing_key, body=body)
logging.info(
"Produced message %r with routing key %r", body, routing_key
)
# Consume messages
def handle(channel, method, properties, body):
logging.info("Consumed message %r from queue %r", body.decode(), queue)
channel.basic_nack(method.delivery_tag)
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_consume(queue=queue, on_message_callback=handle)
channel.start_consuming()
输出:
INFO:root:Produced message 'a' with routing key 'foobar'
INFO:root:Produced message 'b' with routing key 'foobar'
INFO:root:Produced message 'c' with routing key 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
您遇到的行为很可能是预取行为造成的。
由于您没有指定所需的服务质量,我相信(希望有更多知识渊博的消息来源来确认这一点?)预取是由服务器决定的,并且可能会非常高。
想法是为了性能问题,一个客户端可以获得多条消息,这在大多数情况下是有利的:
- 如果消费者端有多线程,他可能可以并行处理多条消息,因此在任何给定时间会有多条消息尚未确认
- 为了在 "happy" 情况下允许更流畅的处理,客户端可以确认消息块,让服务器知道直到给定消息,消费者收到的所有消息都被确认,它减少了开销当我们有大量消息需要很少处理的情况时
如果您查看下面的文档链接,它们会解释您如何控制该行为。
有关这些要点的更多信息,请访问:
谢谢@Olivier。使用 channel.basic_qos(prefetch_count=1)
我得到记录的行为:
INFO:root:Produced message 'a' with routing key 'foobar'
INFO:root:Produced message 'b' with routing key 'foobar'
INFO:root:Produced message 'c' with routing key 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'