zmq多进程间通信

Communication between multiple processes with zmq


我有 n 个进程,它们有自己的本地数据和操作,我希望每个进程将其本地数据的 "snapshot" 发送到其余 运行 个节点进程。
到目前为止,我的代码如下所示:
def node1():
    Process(target=sync_1).start()
    sleep(4)
    data = {'node': 1, 'data': 'node 1 data'}

    context_b = zmq.Context()
    socket_b = context_b.socket(zmq.PUB)

    connnected = False
    try:
        socket_b.bind("tcp://*:%s" % 5560)
        connnected = True
    except Exception as e:
        print(e)
    if connnected:
        topic = "101"
        try:
            socket_b.send_string(topic + ' ' + json.dumps(data))
        except Exception as e:
                print(e)
    socket_b.close()
    context_b.term()

def node2():
    Process(target=sync_2).start()

def sync_1():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560
    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

     try:
         raw = socket_c.recv().decode("utf-8")
         json0 = raw.find('{')
         topic = raw[0:json0].strip()
         msg = json.loads(raw[json0:])
         print("[SYNC 1] received {}-{}]".format(topic, msg))
     except Exception as e:
         print(e)

def sync_2():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560

    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

    try:
        raw = socket_c.recv().decode("utf-8")
        json0 = raw.find('{')
        topic = raw[0:json0].strip()
        msg = json.loads(raw[json0:])
        print("[SYNC 2] received {}-{}]".format(topic, msg))
    except Exception as e:
        print(e)

if __name__ == '__main__':
    Process(target=node1).start()
    Process(target=node2).start()

每个节点在后台都有一个“侦听器”进程运行(同步功能)以接收每个节点数据并相应地使用它,当所有子套接字都连接到时它工作正常一个节点(在这种情况下为节点 1),但我希望每个节点都将数据发送到所有侦听器,所以我不确定如何实现这一点,因为侦听器进程可以连接到一个端口。

另外,每次有更新时,节点都必须发送本地数据快照,所以这不能是一次性通信,因此我想到让侦听器进程一直主动等待更新。

我相信图表可能对解决该问题有用:

可以有更简单的方法来解决这个问题,因此非常感谢任何帮助!

更新: 解决方案是使用 XPUB-XSUB 模式。
通过使用这种模式,我创建了一个代理线程,它允许我做我想做的事。
我能为 Python 找到的最有用的例子是 this .