订阅多个进程后处于奇怪状态的pyzmq代理

pyzmq proxy in a strange state after subscribing multiple processes

我在 pyzmq 中遇到了一个奇怪的代理问题。这是该代理的代码:

import zmq
context = zmq.Context.instance()

frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)

backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)

zmq.proxy(frontend_socket, backend_socket)

我正在使用该代理在 6 台不同机器上 运行 的约 50 个进程之间发送消息。主题总量在1000左右,但是由于多个进程可以监听同一个主题,所以订阅总量在10000左右

在正常情况下,这非常有效,只要一个进程发布消息并且至少有一个其他进程订阅了该主题,消息就会正确地通过代理。无论是先启动发布者还是订阅者,它都有效。

但是在某个时间点,当我们启动一个新进程(我们称它为 X)时,它开始表现得很奇怪。已经连接的所有东西都在继续工作,但是我们连接的新进程只有在发布者先于订阅者连接的情况下才能让消息通过。 X可以是任何一个正常工作的进程,也可以来自任何一台机器,结果都是一样的。当我们进入这种状态时,杀死 X 会使一切重新开始,而再次启动它会使它失败。如果我们停止其他进程然后启动X,它运行良好(所以它与X的代码没有特别相关)。

我不确定我们是否会达到 ZMQ 的某个限制?我读过一些人的例子,他们似乎比我们拥有更多的流程、订阅等。这可能是我们应该在代理上设置的一些选项,到目前为止,这里是我们尝试过但没有成功的选项:

以下是我们如何向代理发布消息的示例代码:

topic = "test"
message = {"test": "test"}

context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
    time.sleep(1)
    socket.send_multipart([topic.encode(), json.dumps(message).encode()])

以下是我们如何从代理订阅消息的示例代码:

topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)

while True:
    multi_part = socket.recv_multipart()
    [topic, message] = multi_part
    print(topic.decode(), message.decode())

有没有人见过类似的问题?我们可以做些什么来避免代理进入这种状态吗?

谢谢!

使所有发布者(代理和发布过程)XPUB(+ sockopt verbose/verboser)然后在轮询循环中从发布者套接字读取。订阅消息的第一个字节将告诉您消息是否为 sub/unsub,后跟 subject/topic。如果您记录所有带有时间戳的信息,它应该会告诉您哪个组件有问题(可能是三个组件中的任何一个)并帮助修复。

到达发布者 (XPUB) 的订阅消息的格式将是

  • 订阅[0x01][topic]
  • 退订[0x00][topic]

需要代码

我通常使用 C++,但这是 python

中的总体思路

代理

您需要创建一个捕获套接字(这就像一个网络分流器)。您通过 inproc 将 ZMQ_PAIR 套接字连接到代理(捕获),然后读取套接字另一端的内容。当您使用 XPUB/XSUB 时,您将看到订阅消息。

zmq.proxy(frontend, backend, capture)

阅读 docs/examples python 代理。

出版商

在这种情况下,您需要在发送时在同一线程中从发布套接字读取数据。这就是我说轮询循环可能是最好的原因。

此代码根本没有经过测试。

topic = "test"
message = {"test": "test"}

context = zmq.Context.instance()
socket = context.socket(zmq.XPUB)
socket.connect("tcp://1.2.3.4:1234")

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
timeout = 1000  #ms

while True:
  socks = dict(poller.poll(timeout))
  if not socks : # 1
    socket.send_multipart([topic.encode(), json.dumps(message).encode()])
  if socket in socks:
    sub_msg = socket.recv()  
    # print out the message here.