ZMQ Python 处理异步请求的请求回复代理

ZMQ Python Request Reply Broker for Addressed Asynchronous Requests

我想利用 ZMQ 实现(在 python 中)代理和客户端异步处理对寻址实体的请求-回复。客户端包含执行请求和回复的功能(唯一缺少的是确切的socket-type/pattern)。

请求可以阻塞,但回复端需要能够处理传入的并行(线程)请求。(即 REP 套接字不够好,因为它需要在下一次接收之前发送)

它需要通过代理,因为会有许多可能的实体可以进行请求和回复,而我只想绑定一组端口(不是每个实体一个)。

Entity1                Broker                    Entity2
  REQ ------------- ROUTER ?????? -------------- ??????

Entity1 将知道 Entity2 的 ID 并使用它来确保特定地向 Entity2 发出请求。可以有任意数量的实体,但所有应响应请求的实体都将注册 ID。

我试过在上面代理的右侧使用 DEALER,但它似乎只会发送循环请求。

所以有人知道我可以用来异步处理特定实体的好的 pattern/set 套接字吗?

总结:

我已经非常广泛地阅读了 ZMQ 手册,但我还没有找到任何真正好的模式来通过代理寻址特定套接字,因此非常感谢任何帮助。

经过进一步的研究和测试,我发现了一种似乎可以满足我所有需求的模式。

模式

Requester               Broker                   Replier
  REQ ------------- ROUTER ROUTER -------------- DEALER
                (requests) (replies)

请求者

客户端的请求端简单地连接到代理上的请求路由器,发送请求并开始读取套接字以获取回复:

reqSocket.connect(self._reqAddress)
reqSocket.send_multipart([repId, message])
reply = reqSocket.recv_multipart()[0]

回复者 ID 作为消息的第一部分包含在内,例如:

Outgoing message: ['replierId', 'requestMsg']

请求路由器

if self.reqRouterSocket in socketEvents:
    multipart = self.reqRouterSocket.recv_multipart()
    multipart = [multipart[-2]] + multipart
    del multipart[-2]
    self.repRouterSocket.send_multipart(multipart)

即,请求路由器只是移动有效负载的第一部分(即 replierId)并将其放在地址堆栈的第一位:

Incoming message: ['reqSocketAddr', '', 'replierId', 'requestMsg']
Outgoing message: ['replierId', 'reqSocketAddr', '', 'requestMsg']

传出消息是从回复路由器发送的。由于回复者将其套接字 ID 设置为 'replierId' 并已连接到回复者路由器,该路由器识别此地址并能够成功传递请求。

回复者

回复者需要将其自己的套接字标识设置为某个已知值,以便如上所述直接寻址。

注意: 在执行到回复路由器的连接之前,您必须设置 DEALER 套接字的套接字 ID。设置套接字的身份:

self.dealerSocket.setsockopt(zmq.IDENTITY, 'replierId')

否则路由器将不知道 id 并会抛出消息。

回复者侦听传入的请求。在我的例子中,这都是线程化的,请求是异步处理的。这就是使用 DEALER 套接字而不是常规 REP 的原因,这在同步情况下会容易得多。 DEALER 套接字可以接收更多请求,而不必先回答第一个请求,而 REP 必须这样做。然而,在回复方所做的简化版本是:

multipart = self.dealerSocket.recv_multipart()
returnRoute = multipart[:-1]
requestMsg = multipart[-1]
reply = someFunction(requestMsg)
self.dealerSocket.send_multipart(returnRoute + [reply])

即,回复者只是 returns 它得到了什么,但是请求更改为回复:

Incoming message: ['replierId', 'reqSocketAddr', '', 'request']
Outgoing message: ['replierId', 'reqSocketAddr', '', 'reply']

然后将此传出消息发送回回复路由器。

回复路由器

在代理的这一端选择路由器纯粹是因为它需要在许多已连接的套接字中寻址特定套接字的功能。

if self.repRouterSocket in socketEvents:
    multipart = self.repRouterSocket.recv_multipart()
    self.reqRouterSocket.send_multipart(multipart[1:])

即弹出地址栈的首地址,将报文再次发送给请求方。

Incoming message: ['replierId', 'reqSocketAddr', '', 'reply']
Outgoing message: ['reqSocketAddr', '', 'reply']

请求路由器识别此地址并将请求发送回接收到的请求者:

Incoming list: ['reply']

这个模式似乎满足了我在问题中提出的要求。希望对其他人也有用。