如何设置 RabbitMQ 消费者从非空队列消费?
How do I set up a RabbitMQ consumer to consume from a nonempty queue?
我目前正在 Python 中使用 RabbitMQ,使用 Pika 客户端创建一个处理各种消息类型的服务器。我的基本设置是一个接收所有传入消息的队列,一个将它们定向到正确目的地的路由进程,以及几个处理请求和接受传入数据的进程。除了在一个特定情况下,此设置一直运行良好。当我在服务器进程启动之前有 RabbitMQ 服务器 运行 并且它收到一条消息时,它会将这些消息正确地存储在传入消息队列中。但是,当我随后尝试启动这些进程并使用 pika.basic_consume 函数为该非空传入队列设置消费者时,程序挂起。因此,目前如果我想启动我的服务器进程,我必须在它正常工作之前清除队列中的所有消息。我如何解决此问题以使用非空队列?
这是其中一个流程的示例,它们的设置与本流程基本相同。
class Router(Process):
def __init__(self,routing_table):
super(Router,self).__init__()
self.routing_table = routing_table
self.routeQueues = {
'r' : 'registration',
't' : 'util',
'p' : 'util',
's' : 'data'
}
# Create a connection to the RabbitMQ server.
self.rabbitConn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.rabbitConn.channel()
# Load all of the existing registered node queues
with open('registrations/nodes.txt','r') as nodes:
for line in nodes:
info = line.strip().split(":")
self.channel.queue_declare(info[1])
# Declare the default queues
queue_list = ["incoming","registration","util"]
for queueName in queue_list:
self.channel.queue_declare(queueName)
# Start consuming things from the incoming queue
self.channel.basic_consume(self.gotPacket,queue='incoming')
def gotPacket(self,ch,method,params,body):
# Does stuff. Not relevant here.
pass
def run(self):
self.channel.start_consuming()
此问题是由 pika 0.9.13 库引起的。升级到 pika 0.9.14 可以解决这个问题。 @eandersson
我目前正在 Python 中使用 RabbitMQ,使用 Pika 客户端创建一个处理各种消息类型的服务器。我的基本设置是一个接收所有传入消息的队列,一个将它们定向到正确目的地的路由进程,以及几个处理请求和接受传入数据的进程。除了在一个特定情况下,此设置一直运行良好。当我在服务器进程启动之前有 RabbitMQ 服务器 运行 并且它收到一条消息时,它会将这些消息正确地存储在传入消息队列中。但是,当我随后尝试启动这些进程并使用 pika.basic_consume 函数为该非空传入队列设置消费者时,程序挂起。因此,目前如果我想启动我的服务器进程,我必须在它正常工作之前清除队列中的所有消息。我如何解决此问题以使用非空队列?
这是其中一个流程的示例,它们的设置与本流程基本相同。
class Router(Process):
def __init__(self,routing_table):
super(Router,self).__init__()
self.routing_table = routing_table
self.routeQueues = {
'r' : 'registration',
't' : 'util',
'p' : 'util',
's' : 'data'
}
# Create a connection to the RabbitMQ server.
self.rabbitConn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.rabbitConn.channel()
# Load all of the existing registered node queues
with open('registrations/nodes.txt','r') as nodes:
for line in nodes:
info = line.strip().split(":")
self.channel.queue_declare(info[1])
# Declare the default queues
queue_list = ["incoming","registration","util"]
for queueName in queue_list:
self.channel.queue_declare(queueName)
# Start consuming things from the incoming queue
self.channel.basic_consume(self.gotPacket,queue='incoming')
def gotPacket(self,ch,method,params,body):
# Does stuff. Not relevant here.
pass
def run(self):
self.channel.start_consuming()
此问题是由 pika 0.9.13 库引起的。升级到 pika 0.9.14 可以解决这个问题。 @eandersson