除非我使用绑定,否则 Pyzmq SUB 不会接收消息

Pyzmq SUB doesn't receive messages unless I use bind

我有 3 个进程,我们称它们为 hostworker1worker2。我希望 worker1worker2 能够通过 PUB/SUB 套接字直接相互通信(host 间歇性插入),所以我有以下设置:

# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')

# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')

# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()

截至目前,此设置不起作用。 worker1 发送正常,但 worker2 似乎从未收到消息。但是,如果我现在将设置更改为:

# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')

# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')

# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()

它工作得很好。但是,如果我也绑定 host,它会再次停止工作。

这是为什么?如果我现在有 workerN 也需要订阅 worker1,会发生什么情况,我如何绑定所有进程?这些 bind/connect 语义是什么? host 不是长期存在的进程,通过 binding 做正确的事情吗?如果是这样,为什么 worker2connect 时无法接收正在?


MWE:https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a

import zmq
import time
from multiprocessing import Process

addr = 'ipc:///tmp/test_zmq'

def worker1():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.connect(addr)
    while True:
        sock.send(b'worker1')
        print('worker1 sent')
        time.sleep(1)

def worker2():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(addr) # Change this to bind
    sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
    while True:
        sock.recv()
        print('worker2 heard')

def main():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(addr) # Change this to connect (or remove entirely)

    p1 = Process(target=worker1)
    p2 = Process(target=worker2)
    p1.start()
    p2.start()

    p1.join()

if __name__ == '__main__':
    main()

Q : "Isn't host, being the long-lived process, doing the right thing by binding,...?"

好吧,没人能说得清
没有任何证据支持这种说法(""成为 long-lived 过程" ),任何此类扣除的假设越少 (".. .如果是这样,为什么在连接时worker2无法接收?)

我们可以告诉的是,
worker2可能会尝试使用这两种方法中的任何一种, 无论是 { .bind() | .connect() }, 成功, 但仍然没有任何保证 ( 一个很棒的 Zen-of-Zero ) 无论是否 POSACK 收到.这取决于许多其他因素,在 系统中更多:

  • 如果/何时/如何管理 TOPIC-列表(使用和调用 .setsockopt( zmq.SUBSCRIBE, ... ) 方法的顺序),
  • 如果 PUB 端代理确实发送过与任何主动订阅或未订阅的主题实际匹配的消息,
  • 如果默认和/或配置的资源实际上都处于一种状态,这足以在 .send()-er 端接受任何此类潜在可交付消息进行传输,并具有来自的所有必要传输方式sender-side Context()-实例到预期的 receiving-side Context()-实例的手中,最后,通过 .recv()-方法在任何地方进行本地交付有意receiver-side(s)与否,

    所以总的来说确实不好说。您的代码存在几个主要冲突:

worker1PUB 方面的角色实际上永远不会在上面的第一个示例中提供一点。


Q : "Why is this?"

要求 PUB 为另一个 PUB 设置一个 transport-link 不是合法的 ZeroMQ 正式通信原型模式——这样的对 .connect() 方法的调用,负责对于要求这样的设置,应该检查返回错误,故事的其余部分就像观察两个人,一个 PUB-in-host 和一个 PUB-in- worker1,他们都试图对着他们的麦克风大喊大叫,但从来没有人听过演讲者的讲话,所以从来没有听到其他人的声音。

可行,但不实用,是吗?


Q : "What happens if I now have workerN which also needs to subscribe to worker1, how can I bind from all the processes?"

我们不需要 .bind(),除非我们想要。

如果workerN想订阅worker1,它可能.bind(),但是worker1不知道有新的workerN出现(到它可能想要 .connect() ),是吗?因此 .connect() 通常用于 1:N-setup 的动态变化的 SUB-side N-ary 队列(实际上是 M:N,原则上) - 有已知 transport-class(es ... 是的,可以有多个 AccessPoints 使用多个,...是的,多个 transport-classes - 都可以得到 PUB-side从一个完全相同的 PUB-agent 用于与一个或多个远程 SUB-agents 通信 - 不是很酷吗?) - 正如 .bind()-(s) 建立的N 代理人在未来某个未知时间尝试 .connect() 的动态群组的已知访问点。