zmq多进程间通信
Communication between multiple processes with zmq
我有 n 个进程,它们有自己的本地数据和操作,我希望每个进程将其本地数据的 "snapshot" 发送到其余 运行 个节点进程。
到目前为止,我的代码如下所示:
def node1():
Process(target=sync_1).start()
sleep(4)
data = {'node': 1, 'data': 'node 1 data'}
context_b = zmq.Context()
socket_b = context_b.socket(zmq.PUB)
connnected = False
try:
socket_b.bind("tcp://*:%s" % 5560)
connnected = True
except Exception as e:
print(e)
if connnected:
topic = "101"
try:
socket_b.send_string(topic + ' ' + json.dumps(data))
except Exception as e:
print(e)
socket_b.close()
context_b.term()
def node2():
Process(target=sync_2).start()
def sync_1():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 1] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
def sync_2():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 2] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
if __name__ == '__main__':
Process(target=node1).start()
Process(target=node2).start()
每个节点在后台都有一个“侦听器”进程运行(同步功能)以接收每个节点数据并相应地使用它,当所有子套接字都连接到时它工作正常一个节点(在这种情况下为节点 1),但我希望每个节点都将数据发送到所有侦听器,所以我不确定如何实现这一点,因为侦听器进程可以连接到一个端口。
另外,每次有更新时,节点都必须发送本地数据快照,所以这不能是一次性通信,因此我想到让侦听器进程一直主动等待更新。
我相信图表可能对解决该问题有用:
可以有更简单的方法来解决这个问题,因此非常感谢任何帮助!
更新:
解决方案是使用 XPUB-XSUB 模式。
通过使用这种模式,我创建了一个代理线程,它允许我做我想做的事。
我能为 Python 找到的最有用的例子是 this .
我有 n 个进程,它们有自己的本地数据和操作,我希望每个进程将其本地数据的 "snapshot" 发送到其余 运行 个节点进程。
到目前为止,我的代码如下所示:
def node1():
Process(target=sync_1).start()
sleep(4)
data = {'node': 1, 'data': 'node 1 data'}
context_b = zmq.Context()
socket_b = context_b.socket(zmq.PUB)
connnected = False
try:
socket_b.bind("tcp://*:%s" % 5560)
connnected = True
except Exception as e:
print(e)
if connnected:
topic = "101"
try:
socket_b.send_string(topic + ' ' + json.dumps(data))
except Exception as e:
print(e)
socket_b.close()
context_b.term()
def node2():
Process(target=sync_2).start()
def sync_1():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 1] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
def sync_2():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 2] received {}-{}]".format(topic, msg))
except Exception as e:
print(e)
if __name__ == '__main__':
Process(target=node1).start()
Process(target=node2).start()
每个节点在后台都有一个“侦听器”进程运行(同步功能)以接收每个节点数据并相应地使用它,当所有子套接字都连接到时它工作正常一个节点(在这种情况下为节点 1),但我希望每个节点都将数据发送到所有侦听器,所以我不确定如何实现这一点,因为侦听器进程可以连接到一个端口。
另外,每次有更新时,节点都必须发送本地数据快照,所以这不能是一次性通信,因此我想到让侦听器进程一直主动等待更新。
我相信图表可能对解决该问题有用:
可以有更简单的方法来解决这个问题,因此非常感谢任何帮助!
更新:
解决方案是使用 XPUB-XSUB 模式。
通过使用这种模式,我创建了一个代理线程,它允许我做我想做的事。
我能为 Python 找到的最有用的例子是 this .