ZMQ 设备、ioloop 和多处理
ZMQ devices, ioloop and multiprocessing
我正在尝试应用 PUSH/PULL 模式,如下图所示:
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL ---> Send via HTTP
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
PUSH
套接字连接到 ZeroMQ 设备并发出消息,然后将这些消息传播到所有连接的 PULL
套接字。我想要实现的是一种对管道中多个节点的并行处理。
PULL
套接字完成处理后,它应该通过 HTTP 将消息转发到远程端点。
代码如下:
from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
bind_in_port = 5559
bind_out_port = 5560
dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)
def push():
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
server_id = random.randrange(1,10005)
for i in range(5):
print("Message %d sent" % i)
socket.send_string("Push from %s" % server_id)
def pull():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
loop = ioloop.IOLoop.instance()
pull_stream = ZMQStream(socket, loop)
def on_recv(message):
print(message)
pull_stream.on_recv(on_recv)
loop.start()
Process(target=push).start()
time.sleep(2)
for i in range(2):
Process(target=pull).start()
尽管消息已正确发送到 ZeroMQ 设备,但我看不到收到任何消息 - on_recv
回调从未被调用。
感谢任何帮助。
谢谢
设备初始化中缺少上面的代码来提供完整的答案。
dev 和 *port 是什么?
1 件事可能是在 .connect 之后添加 sleep(1) 以使端口稳定
注意:无需在 push/pull
上设置身份
我正在尝试应用 PUSH/PULL 模式,如下图所示:
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL ---> Send via HTTP
| PULL ---> Send via HTTP
| PULL ---> Send via HTTP
PUSH
套接字连接到 ZeroMQ 设备并发出消息,然后将这些消息传播到所有连接的 PULL
套接字。我想要实现的是一种对管道中多个节点的并行处理。
PULL
套接字完成处理后,它应该通过 HTTP 将消息转发到远程端点。
代码如下:
from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
bind_in_port = 5559
bind_out_port = 5560
dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)
def push():
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
server_id = random.randrange(1,10005)
for i in range(5):
print("Message %d sent" % i)
socket.send_string("Push from %s" % server_id)
def pull():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
loop = ioloop.IOLoop.instance()
pull_stream = ZMQStream(socket, loop)
def on_recv(message):
print(message)
pull_stream.on_recv(on_recv)
loop.start()
Process(target=push).start()
time.sleep(2)
for i in range(2):
Process(target=pull).start()
尽管消息已正确发送到 ZeroMQ 设备,但我看不到收到任何消息 - on_recv
回调从未被调用。
感谢任何帮助。
谢谢
设备初始化中缺少上面的代码来提供完整的答案。 dev 和 *port 是什么? 1 件事可能是在 .connect 之后添加 sleep(1) 以使端口稳定 注意:无需在 push/pull
上设置身份