是否可以使用 pika 从每个队列中消费?
Is it possible to consume from every queue with pika?
我正在尝试设置一个程序,该程序将从 RabbitMQ 中的每个队列中消费,并根据某些消息将 运行 某些脚本。不幸的是,如果 运行 出现单个错误(即超时或未找到队列),则在添加消费者时整个通道都死了。此外,队列来来去去,因此它必须经常刷新队列列表。这可能吗?
到目前为止,这是我的代码。
import pika
import requests
import sys
try:
host = sys.argv[1]
except:
host = "localhost"
def get_queues(host="localhost", port=15672, user="guest", passwd="guest", virtual_host=None):
url = 'http://%s:%s/api/queues/%s' % (host, port, virtual_host or '')
response = requests.get(url, auth=(user, passwd))
return response.json()
queues = get_queues(host)
def get_on_message(queue):
def on_message(channel, method_frame, header_frame, body):
print("message from", queue)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
for queue in queues:
print(channel.is_open)
try:
channel.basic_consume(get_on_message(queue["name"]), queue["name"])
print("queue added",queue["name"])
except Exception as e:
print("queue failed",queue["name"])
sys.exit()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
是否有正确的方法来做到这一点,或者这样做根本不正确?
可以使用任何语言从每个队列中消费。这也是错误的,如果这是必需的,那么整个 design/setup 应该重新考虑。
评论后编辑:
基本上,您需要获取所有现有队列的名称,这可以通过 rest api (potentially even by calling rabbitmqctl and parsing the output). Once you have the names of the queues, you can simply consume from them as it is explained in the tutorial 以编程方式完成。
再一次,我认为这不是正确的方法,也许你应该考虑使用主题交换 - 我猜这是因为你写了 queues come and go
。
我正在尝试设置一个程序,该程序将从 RabbitMQ 中的每个队列中消费,并根据某些消息将 运行 某些脚本。不幸的是,如果 运行 出现单个错误(即超时或未找到队列),则在添加消费者时整个通道都死了。此外,队列来来去去,因此它必须经常刷新队列列表。这可能吗? 到目前为止,这是我的代码。
import pika
import requests
import sys
try:
host = sys.argv[1]
except:
host = "localhost"
def get_queues(host="localhost", port=15672, user="guest", passwd="guest", virtual_host=None):
url = 'http://%s:%s/api/queues/%s' % (host, port, virtual_host or '')
response = requests.get(url, auth=(user, passwd))
return response.json()
queues = get_queues(host)
def get_on_message(queue):
def on_message(channel, method_frame, header_frame, body):
print("message from", queue)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
for queue in queues:
print(channel.is_open)
try:
channel.basic_consume(get_on_message(queue["name"]), queue["name"])
print("queue added",queue["name"])
except Exception as e:
print("queue failed",queue["name"])
sys.exit()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
是否有正确的方法来做到这一点,或者这样做根本不正确?
可以使用任何语言从每个队列中消费。这也是错误的,如果这是必需的,那么整个 design/setup 应该重新考虑。
评论后编辑:
基本上,您需要获取所有现有队列的名称,这可以通过 rest api (potentially even by calling rabbitmqctl and parsing the output). Once you have the names of the queues, you can simply consume from them as it is explained in the tutorial 以编程方式完成。
再一次,我认为这不是正确的方法,也许你应该考虑使用主题交换 - 我猜这是因为你写了 queues come and go
。