如何在 pika 库上向方法 start_consuming() 添加超时

How to add a timeout to method start_consuming() on pika library

我有一个 BlockingConnection,我遵循 the examples 鼠兔文档。但在所有这些中,开始消费消息的代码示例是:

connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message)
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
connection.close()

(或多或少的细节)。

我必须编写很多脚本,我想一个接一个地 运行(为了 test/research 目的)。但是上面的代码要求我在每个代码中都添加 ^C。

我尝试添加一些超时 explained in the documentation,但我运气不好。例如,如果我找到一个参数,如果客户端在最后 X 秒内未使用任何消息,则设置,然后脚本完成。这在 pika lib 中可行吗?或者我必须改变方法?

如果您不想让代码阻塞,请不要使用 start_consuming。使用 SelectConnection 或使用 consumethis method。您可以为传递给 consume.

的参数添加超时

注意: RabbitMQ 团队监控 rabbitmq-users mailing list 并且有时只在 Whosebug 上回答问题。

import pika

parameters = pika.ConnectionParameters(host="localhost")
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

def ack_message(channel, method):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(method.delivery_tag)
    else:
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        pass

def callback(channel,method, properties, body):
    ack_message(channel,method)
    print("body",body, flush=True)

channel.basic_consume(
    queue="hello", on_message_callback=callback)

channel.start_consuming()
connection.close()

我的原码是Luke Bakken的回答。
但是我稍微修改了代码。
:)