订阅多个进程后处于奇怪状态的pyzmq代理
pyzmq proxy in a strange state after subscribing multiple processes
我在 pyzmq 中遇到了一个奇怪的代理问题。这是该代理的代码:
import zmq
context = zmq.Context.instance()
frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)
backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)
zmq.proxy(frontend_socket, backend_socket)
我正在使用该代理在 6 台不同机器上 运行 的约 50 个进程之间发送消息。主题总量在1000左右,但是由于多个进程可以监听同一个主题,所以订阅总量在10000左右
在正常情况下,这非常有效,只要一个进程发布消息并且至少有一个其他进程订阅了该主题,消息就会正确地通过代理。无论是先启动发布者还是订阅者,它都有效。
但是在某个时间点,当我们启动一个新进程(我们称它为 X)时,它开始表现得很奇怪。已经连接的所有东西都在继续工作,但是我们连接的新进程只有在发布者先于订阅者连接的情况下才能让消息通过。 X可以是任何一个正常工作的进程,也可以来自任何一台机器,结果都是一样的。当我们进入这种状态时,杀死 X 会使一切重新开始,而再次启动它会使它失败。如果我们停止其他进程然后启动X,它运行良好(所以它与X的代码没有特别相关)。
我不确定我们是否会达到 ZMQ 的某个限制?我读过一些人的例子,他们似乎比我们拥有更多的流程、订阅等。这可能是我们应该在代理上设置的一些选项,到目前为止,这里是我们尝试过但没有成功的选项:
- 在 frontend_socket
上更改 RCVHWM
- 在 backend_socket
上更改 SNDHWM
- 在 backend_socket
上设置 XPUB_VERBOSE
- 在 backend_socket
上设置 XPUB_VERBOSER
以下是我们如何向代理发布消息的示例代码:
topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
time.sleep(1)
socket.send_multipart([topic.encode(), json.dumps(message).encode()])
以下是我们如何从代理订阅消息的示例代码:
topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)
while True:
multi_part = socket.recv_multipart()
[topic, message] = multi_part
print(topic.decode(), message.decode())
有没有人见过类似的问题?我们可以做些什么来避免代理进入这种状态吗?
谢谢!
使所有发布者(代理和发布过程)XPUB(+ sockopt verbose/verboser)然后在轮询循环中从发布者套接字读取。订阅消息的第一个字节将告诉您消息是否为 sub/unsub,后跟 subject/topic。如果您记录所有带有时间戳的信息,它应该会告诉您哪个组件有问题(可能是三个组件中的任何一个)并帮助修复。
到达发布者 (XPUB) 的订阅消息的格式将是
- 订阅
[0x01][topic]
- 退订
[0x00][topic]
需要代码
我通常使用 C++,但这是 python
中的总体思路
代理
您需要创建一个捕获套接字(这就像一个网络分流器)。您通过 inproc 将 ZMQ_PAIR 套接字连接到代理(捕获),然后读取套接字另一端的内容。当您使用 XPUB/XSUB 时,您将看到订阅消息。
zmq.proxy(frontend, backend, capture)
阅读 docs/examples python 代理。
出版商
在这种情况下,您需要在发送时在同一线程中从发布套接字读取数据。这就是我说轮询循环可能是最好的原因。
此代码根本没有经过测试。
topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.XPUB)
socket.connect("tcp://1.2.3.4:1234")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
timeout = 1000 #ms
while True:
socks = dict(poller.poll(timeout))
if not socks : # 1
socket.send_multipart([topic.encode(), json.dumps(message).encode()])
if socket in socks:
sub_msg = socket.recv()
# print out the message here.
我在 pyzmq 中遇到了一个奇怪的代理问题。这是该代理的代码:
import zmq
context = zmq.Context.instance()
frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)
backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)
zmq.proxy(frontend_socket, backend_socket)
我正在使用该代理在 6 台不同机器上 运行 的约 50 个进程之间发送消息。主题总量在1000左右,但是由于多个进程可以监听同一个主题,所以订阅总量在10000左右
在正常情况下,这非常有效,只要一个进程发布消息并且至少有一个其他进程订阅了该主题,消息就会正确地通过代理。无论是先启动发布者还是订阅者,它都有效。
但是在某个时间点,当我们启动一个新进程(我们称它为 X)时,它开始表现得很奇怪。已经连接的所有东西都在继续工作,但是我们连接的新进程只有在发布者先于订阅者连接的情况下才能让消息通过。 X可以是任何一个正常工作的进程,也可以来自任何一台机器,结果都是一样的。当我们进入这种状态时,杀死 X 会使一切重新开始,而再次启动它会使它失败。如果我们停止其他进程然后启动X,它运行良好(所以它与X的代码没有特别相关)。
我不确定我们是否会达到 ZMQ 的某个限制?我读过一些人的例子,他们似乎比我们拥有更多的流程、订阅等。这可能是我们应该在代理上设置的一些选项,到目前为止,这里是我们尝试过但没有成功的选项:
- 在 frontend_socket 上更改 RCVHWM
- 在 backend_socket 上更改 SNDHWM
- 在 backend_socket 上设置 XPUB_VERBOSE
- 在 backend_socket 上设置 XPUB_VERBOSER
以下是我们如何向代理发布消息的示例代码:
topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
time.sleep(1)
socket.send_multipart([topic.encode(), json.dumps(message).encode()])
以下是我们如何从代理订阅消息的示例代码:
topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)
while True:
multi_part = socket.recv_multipart()
[topic, message] = multi_part
print(topic.decode(), message.decode())
有没有人见过类似的问题?我们可以做些什么来避免代理进入这种状态吗?
谢谢!
使所有发布者(代理和发布过程)XPUB(+ sockopt verbose/verboser)然后在轮询循环中从发布者套接字读取。订阅消息的第一个字节将告诉您消息是否为 sub/unsub,后跟 subject/topic。如果您记录所有带有时间戳的信息,它应该会告诉您哪个组件有问题(可能是三个组件中的任何一个)并帮助修复。
到达发布者 (XPUB) 的订阅消息的格式将是
- 订阅
[0x01][topic]
- 退订
[0x00][topic]
需要代码
我通常使用 C++,但这是 python
中的总体思路代理
您需要创建一个捕获套接字(这就像一个网络分流器)。您通过 inproc 将 ZMQ_PAIR 套接字连接到代理(捕获),然后读取套接字另一端的内容。当您使用 XPUB/XSUB 时,您将看到订阅消息。
zmq.proxy(frontend, backend, capture)
阅读 docs/examples python 代理。
出版商
在这种情况下,您需要在发送时在同一线程中从发布套接字读取数据。这就是我说轮询循环可能是最好的原因。
此代码根本没有经过测试。
topic = "test"
message = {"test": "test"}
context = zmq.Context.instance()
socket = context.socket(zmq.XPUB)
socket.connect("tcp://1.2.3.4:1234")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
timeout = 1000 #ms
while True:
socks = dict(poller.poll(timeout))
if not socks : # 1
socket.send_multipart([topic.encode(), json.dumps(message).encode()])
if socket in socks:
sub_msg = socket.recv()
# print out the message here.