如何使用 pika 和 rabbitmq 的多线程来执行请求和响应 RPC 消息
how to use multithreading with pika and rabbitmq to perform Requests and Responses RPC Messages
我正在使用 Rabbitmq 开发一个项目,我正在使用 RPC 模式,基本上我正在接收或使用来自队列的消息,进行一些处理,然后发回响应。我正在使用 Pika,我的目标是为每个任务使用一个线程,因此对于每个任务,我都会为该任务创建一个线程。我还读到,最好的做法是只建立一个连接,并根据需要在它下面建立许多通道,但我总是得到这个错误:
'start_consuming may not be called from the scope of '
pika.exceptions.RecursionError:start_consuming 不能从另一个 BlockingConnection 或 BlockingChannel 回调的范围调用。
我做了一些研究,发现 Pika 不是线程安全的,我们应该为每个线程使用一个独立的连接和一个通道。但我不想这样做,因为它被认为是不好的做法。所以我想在这里问一下是否有人已经实现了这项工作。我还读到如果我不使用 BlockingConnection 来实例化我的连接是可能的,并且还有一个名为 add_callback_threadsafe 的函数可以使这成为可能。但不幸的是没有示例,我阅读了文档,但它很复杂,没有示例,我很难理解他们想要描述的内容。
我的尝试是声明两个 Classes。每个 class 将代表一个任务执行器,它接收或使用来自队列的消息,并基于该消息进行一些处理并返回响应。我的想法是在两个任务之间共享一个 rabbitmq 连接,但每个任务都会获得一个独立的通道。在上面的代码中,传递给函数的 rabbit 参数是一个 Class,它包含一些变量,如 Connection 和其他函数,如 EventSubscriber,当调用它时,它将分配一个新的 Channel 并开始使用来自特定 Exchange 和 routingKey 的消息。接下来我声明一个线程并将订阅或消费函数作为目标提供给该线程。另一个任务 Class 看起来也和这个 Class 一样,这就是为什么我只上传这个代码。在主要 Class 中,我连接到 rabbitmq 并将其作为参数传递给两个任务的构造函数 Classes.
class On_Deregistration:
def __init__(self, rabbit):
self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq
def event(self, rabbit):
self.Subscriber = rabbit.EventSubscriber(rabbit, 'testing.test', 'test', False, onDeregistrationFromHRS # this func is task listener)
def subscribeAsync(self):
self.Subscriber.subscribe() # here i call start_consuming
def start(self):
"""start Subscribtion in an Independant Thread """
thread = threading.Thread(target = self.subscribeAsync )
thread.start()
if thread.isAlive():
print("asynchronous subscription started")
MAin Class:
class 应用程序:
def __init__(self):
self.rabbitMq = RabbitMqCommunicationInterface(host='localhost', port=5672)
firstTask = On_Deregistration(self.rabbitMq)
secondTask = secondTask(self.rabbitMq)
app = App()
错误:'start_consuming may not be called from the scope of '
pika.exceptions.RecursionError: start_consuming 不能从另一个 BlockingConnection 或 BlockingChannel 回调的范围调用
我搜索了这个错误的原因,显然 pika 不是线程安全的,但必须有一个解决方案。也许不使用 BlockingConnection ?也许有人可以给我一个如何做到这一点的例子,因为我试过了但没有奏效。也许我遗漏了一些关于如何使用 rabbitmq
实现多线程的信息
经过长时间的研究,我发现 Pika 不是线程安全的。至少目前是这样,也许在新版本中它将是线程安全的。所以现在对于我的项目,我停止使用 Pika,我正在使用 b-rabbit,这是一个围绕 Rabbitpy 的线程安全包装器。但我必须说 Pika 是一个很棒的库,我发现 API 的描述和结构比 rabbitpy 更好,但对于我的项目来说,它必须使用多线程,这就是为什么 Pika 目前是一个糟糕的选择。我希望这对未来的人有所帮助
我正在使用 Rabbitmq 开发一个项目,我正在使用 RPC 模式,基本上我正在接收或使用来自队列的消息,进行一些处理,然后发回响应。我正在使用 Pika,我的目标是为每个任务使用一个线程,因此对于每个任务,我都会为该任务创建一个线程。我还读到,最好的做法是只建立一个连接,并根据需要在它下面建立许多通道,但我总是得到这个错误:
'start_consuming may not be called from the scope of '
pika.exceptions.RecursionError:start_consuming 不能从另一个 BlockingConnection 或 BlockingChannel 回调的范围调用。
我做了一些研究,发现 Pika 不是线程安全的,我们应该为每个线程使用一个独立的连接和一个通道。但我不想这样做,因为它被认为是不好的做法。所以我想在这里问一下是否有人已经实现了这项工作。我还读到如果我不使用 BlockingConnection 来实例化我的连接是可能的,并且还有一个名为 add_callback_threadsafe 的函数可以使这成为可能。但不幸的是没有示例,我阅读了文档,但它很复杂,没有示例,我很难理解他们想要描述的内容。
我的尝试是声明两个 Classes。每个 class 将代表一个任务执行器,它接收或使用来自队列的消息,并基于该消息进行一些处理并返回响应。我的想法是在两个任务之间共享一个 rabbitmq 连接,但每个任务都会获得一个独立的通道。在上面的代码中,传递给函数的 rabbit 参数是一个 Class,它包含一些变量,如 Connection 和其他函数,如 EventSubscriber,当调用它时,它将分配一个新的 Channel 并开始使用来自特定 Exchange 和 routingKey 的消息。接下来我声明一个线程并将订阅或消费函数作为目标提供给该线程。另一个任务 Class 看起来也和这个 Class 一样,这就是为什么我只上传这个代码。在主要 Class 中,我连接到 rabbitmq 并将其作为参数传递给两个任务的构造函数 Classes.
class On_Deregistration:
def __init__(self, rabbit):
self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq
def event(self, rabbit):
self.Subscriber = rabbit.EventSubscriber(rabbit, 'testing.test', 'test', False, onDeregistrationFromHRS # this func is task listener)
def subscribeAsync(self):
self.Subscriber.subscribe() # here i call start_consuming
def start(self):
"""start Subscribtion in an Independant Thread """
thread = threading.Thread(target = self.subscribeAsync )
thread.start()
if thread.isAlive():
print("asynchronous subscription started")
MAin Class:
class 应用程序:
def __init__(self):
self.rabbitMq = RabbitMqCommunicationInterface(host='localhost', port=5672)
firstTask = On_Deregistration(self.rabbitMq)
secondTask = secondTask(self.rabbitMq)
app = App()
错误:'start_consuming may not be called from the scope of '
pika.exceptions.RecursionError: start_consuming 不能从另一个 BlockingConnection 或 BlockingChannel 回调的范围调用
我搜索了这个错误的原因,显然 pika 不是线程安全的,但必须有一个解决方案。也许不使用 BlockingConnection ?也许有人可以给我一个如何做到这一点的例子,因为我试过了但没有奏效。也许我遗漏了一些关于如何使用 rabbitmq
实现多线程的信息经过长时间的研究,我发现 Pika 不是线程安全的。至少目前是这样,也许在新版本中它将是线程安全的。所以现在对于我的项目,我停止使用 Pika,我正在使用 b-rabbit,这是一个围绕 Rabbitpy 的线程安全包装器。但我必须说 Pika 是一个很棒的库,我发现 API 的描述和结构比 rabbitpy 更好,但对于我的项目来说,它必须使用多线程,这就是为什么 Pika 目前是一个糟糕的选择。我希望这对未来的人有所帮助