使用来自 rabbitmq 的消息并通过其客户端连接转发消息的 'twisted' 方式是什么?
What is the 'twisted' way of consuming messages from rabbitmq and forwarding them through its client connections?
我正在twisted
写一个websocket服务器来学习框架。它将从 rabbitmq
代理接收消息,并向连接的客户端发送更新。如果我想通过多个客户端连接一次 broadcast/multi-cast 多条消息,调用(仅作为示例)deferToThread(channel.basic_consume, queue)
或 callInThread(" ")
是一个很好的选择吗?
如果不是,那么 twisted
使用来自 rabbitmq
的消息并将它们转发给连接的客户端的方法是什么?
到目前为止我的策略是:
reactor_thread:
侦听端口 (x) 以设置和维护客户端连接
other_thread:
订阅 rabbitmq 队列并使用消息(如果有)
(永远持续下去)
is calling (just as an example) deferToThread(channel.basic_consume, queue), or callInThread(" ") a very good option for doing so?
在这种情况下使用线程并不能真正提供太多好处,因为消息已经在 RabbitMQ 中排队。我过去也遇到过类似的情况,我可以向您简要概述我在不使用线程的情况下为解决问题所做的工作。免责声明:我有一年或两年没有使用 RabbitMQ 或 Websockets,所以我的知识可能有点模糊。
列出连接的客户端
假设您正在为 websockets 使用 autobahn
,您可以在工厂中添加一个变量 class (autobahn.twisted.websocket.WebSocketServerFactory
),它将跟踪连接的客户端。 list
或 dict
都可以。
factory = WebSocketServerFactory()
factory.connection_list = []
connection_list
变量将在建立连接后存储协议对象 (autobahn.twisted.websocket.WebSocketServerProtocol
)。在协议中,您需要重载 connectionMade
函数以将协议(在本例中为 self
)附加到 self.factory.connection_list
.
def connectionMade(self):
super(WSProtocol, self).connectionMade()
self.factory.connection_list.append(self)
为了灵活性,最好创建类似 "onConnect deferred" 的东西,但这就是它的要点。也许 autobahn
提供了一个接口来这样做。
RabbitMQ
使用 pika
,您可以使用此 example 异步使用消息。根据需要更改频道和交易所名称,使其适用于您的设置。然后我们将进行 2 处更改。首先我们将 factory.connection_list
传递给回调,然后当消息被消费时,我们会将其写入连接的客户端协议。
@defer.inlineCallbacks
def run(connection, proto_list):
#...
l = task.LoopingCall(read, queue_object, proto_list)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object, proto_list):
#...
if body:
print(body)
for client in sorted(proto_list):
yield client.write(body)
yield ch.basic_ack(delivery_tag=method.delivery_tag)
#...
d.addCallback(run, factory.connection_list)
reactor.run()
在read
回调函数中,每消费一条消息,循环任务就会遍历已连接的客户端列表,并向他们发送消息。
我正在twisted
写一个websocket服务器来学习框架。它将从 rabbitmq
代理接收消息,并向连接的客户端发送更新。如果我想通过多个客户端连接一次 broadcast/multi-cast 多条消息,调用(仅作为示例)deferToThread(channel.basic_consume, queue)
或 callInThread(" ")
是一个很好的选择吗?
如果不是,那么 twisted
使用来自 rabbitmq
的消息并将它们转发给连接的客户端的方法是什么?
到目前为止我的策略是:
reactor_thread: 侦听端口 (x) 以设置和维护客户端连接
other_thread: 订阅 rabbitmq 队列并使用消息(如果有) (永远持续下去)
is calling (just as an example) deferToThread(channel.basic_consume, queue), or callInThread(" ") a very good option for doing so?
在这种情况下使用线程并不能真正提供太多好处,因为消息已经在 RabbitMQ 中排队。我过去也遇到过类似的情况,我可以向您简要概述我在不使用线程的情况下为解决问题所做的工作。免责声明:我有一年或两年没有使用 RabbitMQ 或 Websockets,所以我的知识可能有点模糊。
列出连接的客户端
假设您正在为 websockets 使用 autobahn
,您可以在工厂中添加一个变量 class (autobahn.twisted.websocket.WebSocketServerFactory
),它将跟踪连接的客户端。 list
或 dict
都可以。
factory = WebSocketServerFactory()
factory.connection_list = []
connection_list
变量将在建立连接后存储协议对象 (autobahn.twisted.websocket.WebSocketServerProtocol
)。在协议中,您需要重载 connectionMade
函数以将协议(在本例中为 self
)附加到 self.factory.connection_list
.
def connectionMade(self):
super(WSProtocol, self).connectionMade()
self.factory.connection_list.append(self)
为了灵活性,最好创建类似 "onConnect deferred" 的东西,但这就是它的要点。也许 autobahn
提供了一个接口来这样做。
RabbitMQ
使用 pika
,您可以使用此 example 异步使用消息。根据需要更改频道和交易所名称,使其适用于您的设置。然后我们将进行 2 处更改。首先我们将 factory.connection_list
传递给回调,然后当消息被消费时,我们会将其写入连接的客户端协议。
@defer.inlineCallbacks
def run(connection, proto_list):
#...
l = task.LoopingCall(read, queue_object, proto_list)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object, proto_list):
#...
if body:
print(body)
for client in sorted(proto_list):
yield client.write(body)
yield ch.basic_ack(delivery_tag=method.delivery_tag)
#...
d.addCallback(run, factory.connection_list)
reactor.run()
在read
回调函数中,每消费一条消息,循环任务就会遍历已连接的客户端列表,并向他们发送消息。