Pika - RabbitMQ - 为什么在对消费者使用预取计数 1 时我的交付率大于确认率
Pika - RabbitMQ - Why is my deliver-rate greater than acknowledge rate when using prefetch-count 1 for consumer
我在 Python 中遇到 RabbitMQ 实现 PIKA 的问题。
我想从队列中使用 1 条消息,使用它并在工作完成后确认它。然后应该收到下一条消息。
我使用了 prefetch_count = 1 选项,告诉 rabbitMQ 这个消费者一次只想要 1 条消息,并且在这条消息被确认之前不需要新消息。
这是我的(非常简单的)代码:
credentials = pika.PlainCredentials("username","password")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='1.2.3.4', credentials=credentials))
channel = connection.channel()
def consume(ch, method, properties, body):
time.sleep(5) # Here is the work, now just hold 5 seconds
ch.basic_ack(method.delivery_tag)
def init():
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
if __name__ == "__main__":
init()
所以我的问题是,为什么 rabbitmq 传送的文档(40/秒)多于确认的(0.20/秒,正确,因为有 5 秒的暂停)。这两个不应该相等吗?
此外,Unacked 值 (1650) 永远不应大于 1,因为它不应传送任何文档,直到该文档得到确认。
第二个视图显示,消费者没有预取计数。但是预取计数是在连接上设置的。也许我必须将它设置为消费者,但我不知道如何设置它。
我做错了什么?
提前致谢。
经马塞尔确认,
问题与在频道上设置 basic_qos 时有关。
它似乎应该设置在 basic_consume.
之前
def init():
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.start_consuming()
我在 Python 中遇到 RabbitMQ 实现 PIKA 的问题。 我想从队列中使用 1 条消息,使用它并在工作完成后确认它。然后应该收到下一条消息。
我使用了 prefetch_count = 1 选项,告诉 rabbitMQ 这个消费者一次只想要 1 条消息,并且在这条消息被确认之前不需要新消息。
这是我的(非常简单的)代码:
credentials = pika.PlainCredentials("username","password")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='1.2.3.4', credentials=credentials))
channel = connection.channel()
def consume(ch, method, properties, body):
time.sleep(5) # Here is the work, now just hold 5 seconds
ch.basic_ack(method.delivery_tag)
def init():
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
if __name__ == "__main__":
init()
所以我的问题是,为什么 rabbitmq 传送的文档(40/秒)多于确认的(0.20/秒,正确,因为有 5 秒的暂停)。这两个不应该相等吗? 此外,Unacked 值 (1650) 永远不应大于 1,因为它不应传送任何文档,直到该文档得到确认。
我做错了什么?
提前致谢。
经马塞尔确认,
问题与在频道上设置 basic_qos 时有关。 它似乎应该设置在 basic_consume.
之前def init():
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.start_consuming()