除非我使用绑定,否则 Pyzmq SUB 不会接收消息
Pyzmq SUB doesn't receive messages unless I use bind
我有 3 个进程,我们称它们为 host
、worker1
和 worker2
。我希望 worker1
和 worker2
能够通过 PUB/SUB 套接字直接相互通信(host
间歇性插入),所以我有以下设置:
# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
截至目前,此设置不起作用。 worker1
发送正常,但 worker2
似乎从未收到消息。但是,如果我现在将设置更改为:
# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
它工作得很好。但是,如果我也绑定 host
,它会再次停止工作。
这是为什么?如果我现在有 workerN
也需要订阅 worker1
,会发生什么情况,我如何绑定所有进程?这些 bind/connect 语义是什么? host
不是长期存在的进程,通过 bind
ing 做正确的事情吗?如果是这样,为什么 worker2
在 connect
时无法接收正在?
MWE:https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a
import zmq
import time
from multiprocessing import Process
addr = 'ipc:///tmp/test_zmq'
def worker1():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.connect(addr)
while True:
sock.send(b'worker1')
print('worker1 sent')
time.sleep(1)
def worker2():
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect(addr) # Change this to bind
sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
while True:
sock.recv()
print('worker2 heard')
def main():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind(addr) # Change this to connect (or remove entirely)
p1 = Process(target=worker1)
p2 = Process(target=worker2)
p1.start()
p2.start()
p1.join()
if __name__ == '__main__':
main()
Q : "Isn't host
, being the long-lived process, doing the right thing by bind
ing,...?"
好吧,没人能说得清。
没有任何证据支持这种说法(""成为 long-lived 过程" ),任何此类扣除的假设越少 (".. .如果是这样,为什么在连接时worker2
无法接收?)
我们可以告诉的是,
worker2
可能会尝试使用这两种方法中的任何一种, 无论是 { .bind() | .connect() }
, 成功, 但仍然没有任何保证 ( 一个很棒的 Zen-of-Zero ) 无论是否 POSACK
收到.这取决于许多其他因素,在 distributed-computing 系统中更多:
- 如果/何时/如何管理
TOPIC
-列表(使用和调用 .setsockopt( zmq.SUBSCRIBE, ... )
方法的顺序),
- 如果
PUB
端代理确实发送过与任何主动订阅或未订阅的主题实际匹配的消息,
- 如果默认和/或配置的资源实际上都处于一种状态,这足以在
.send()
-er 端接受任何此类潜在可交付消息进行传输,并具有来自的所有必要传输方式sender-side Context()
-实例到预期的 receiving-side Context()
-实例的手中,最后,通过 .recv()
-方法在任何地方进行本地交付有意receiver-side(s)与否,
所以总的来说确实不好说。您的代码存在几个主要冲突:
worker1
在 PUB
方面的角色实际上永远不会在上面的第一个示例中提供一点。
Q : "Why is this?"
要求 PUB
为另一个 PUB
设置一个 transport-link 不是合法的 ZeroMQ 正式通信原型模式——这样的对 .connect()
方法的调用,负责对于要求这样的设置,应该检查返回错误,故事的其余部分就像观察两个人,一个 PUB
-in-host
和一个 PUB
-in- worker1
,他们都试图对着他们的麦克风大喊大叫,但从来没有人听过演讲者的讲话,所以从来没有听到其他人的声音。
可行,但不实用,是吗?
Q : "What happens if I now have workerN
which also needs to subscribe to worker1
, how can I bind from all the processes?"
我们不需要 .bind()
,除非我们想要。
如果workerN
想订阅worker1
,它可能.bind()
,但是worker1
不知道有新的workerN
出现(到它可能想要 .connect()
),是吗?因此 .connect()
通常用于 1:N
-setup 的动态变化的 SUB
-side N-ary 队列(实际上是 M:N
,原则上) - 有已知 transport-class(es ... 是的,可以有多个 AccessPoints 使用多个,...是的,多个 transport-classes - 都可以得到 PUB
-side从一个完全相同的 PUB
-agent 用于与一个或多个远程 SUB
-agents 通信 - 不是很酷吗?) - 正如 .bind()
-(s) 建立的N
代理人在未来某个未知时间尝试 .connect()
的动态群组的已知访问点。
我有 3 个进程,我们称它们为 host
、worker1
和 worker2
。我希望 worker1
和 worker2
能够通过 PUB/SUB 套接字直接相互通信(host
间歇性插入),所以我有以下设置:
# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
截至目前,此设置不起作用。 worker1
发送正常,但 worker2
似乎从未收到消息。但是,如果我现在将设置更改为:
# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
它工作得很好。但是,如果我也绑定 host
,它会再次停止工作。
这是为什么?如果我现在有 workerN
也需要订阅 worker1
,会发生什么情况,我如何绑定所有进程?这些 bind/connect 语义是什么? host
不是长期存在的进程,通过 bind
ing 做正确的事情吗?如果是这样,为什么 worker2
在 connect
时无法接收正在?
MWE:https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a
import zmq
import time
from multiprocessing import Process
addr = 'ipc:///tmp/test_zmq'
def worker1():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.connect(addr)
while True:
sock.send(b'worker1')
print('worker1 sent')
time.sleep(1)
def worker2():
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect(addr) # Change this to bind
sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
while True:
sock.recv()
print('worker2 heard')
def main():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind(addr) # Change this to connect (or remove entirely)
p1 = Process(target=worker1)
p2 = Process(target=worker2)
p1.start()
p2.start()
p1.join()
if __name__ == '__main__':
main()
Q : "Isn't
host
, being the long-lived process, doing the right thing bybind
ing,...?"
好吧,没人能说得清。
没有任何证据支持这种说法(""成为 long-lived 过程" ),任何此类扣除的假设越少 (".. .如果是这样,为什么在连接时worker2
无法接收?)
我们可以告诉的是,worker2
可能会尝试使用这两种方法中的任何一种, 无论是 { .bind() | .connect() }
, 成功, 但仍然没有任何保证 ( 一个很棒的 Zen-of-Zero ) 无论是否 POSACK
收到.这取决于许多其他因素,在 distributed-computing 系统中更多:
- 如果/何时/如何管理
TOPIC
-列表(使用和调用.setsockopt( zmq.SUBSCRIBE, ... )
方法的顺序), - 如果
PUB
端代理确实发送过与任何主动订阅或未订阅的主题实际匹配的消息, - 如果默认和/或配置的资源实际上都处于一种状态,这足以在
.send()
-er 端接受任何此类潜在可交付消息进行传输,并具有来自的所有必要传输方式sender-sideContext()
-实例到预期的 receiving-sideContext()
-实例的手中,最后,通过.recv()
-方法在任何地方进行本地交付有意receiver-side(s)与否,
所以总的来说确实不好说。您的代码存在几个主要冲突:
worker1
在 PUB
方面的角色实际上永远不会在上面的第一个示例中提供一点。
Q : "Why is this?"
要求 PUB
为另一个 PUB
设置一个 transport-link 不是合法的 ZeroMQ 正式通信原型模式——这样的对 .connect()
方法的调用,负责对于要求这样的设置,应该检查返回错误,故事的其余部分就像观察两个人,一个 PUB
-in-host
和一个 PUB
-in- worker1
,他们都试图对着他们的麦克风大喊大叫,但从来没有人听过演讲者的讲话,所以从来没有听到其他人的声音。
可行,但不实用,是吗?
Q : "What happens if I now have
workerN
which also needs to subscribe toworker1
, how can I bind from all the processes?"
我们不需要 .bind()
,除非我们想要。
如果workerN
想订阅worker1
,它可能.bind()
,但是worker1
不知道有新的workerN
出现(到它可能想要 .connect()
),是吗?因此 .connect()
通常用于 1:N
-setup 的动态变化的 SUB
-side N-ary 队列(实际上是 M:N
,原则上) - 有已知 transport-class(es ... 是的,可以有多个 AccessPoints 使用多个,...是的,多个 transport-classes - 都可以得到 PUB
-side从一个完全相同的 PUB
-agent 用于与一个或多个远程 SUB
-agents 通信 - 不是很酷吗?) - 正如 .bind()
-(s) 建立的N
代理人在未来某个未知时间尝试 .connect()
的动态群组的已知访问点。