从 ZMQ PULL 套接字获取数据。如何同步计算?

Get data from ZMQ PULL socket. How to sync computation?

我有一个生产者使用 PULL / PUSH 向多个工作人员发送数据。在执行计算任务之前,所有工作人员都需要接收所有数据。

我尝试使用发送 "go" 的 PUB/SUB 套接字进行同步,但是由于 PUSH 套接字是非阻塞的,因此在数据流结束之前收到了 go...


发件人:

context = zmq.Context()
push_socket = self.context.socket(zmq.PUSH)
push_socket.bind("tcp://127.0.0.1:5557")

pull_socket = self.context.socket(zmq.PULL)
pull_socket.bind("tcp://127.0.0.1:5558")

for index, data in range(100): 
    push_socket.send_json({"data": data, "id": index})
pub_socket.send_json({"command": "map"})

接收者:

# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")

# receive commands
consumer_command = context.socket(zmq.SUB)
consumer_command.subscribe("")
consumer_command.connect("tcp://127.0.0.1:5559")

poller = zmq.Poller()
poller.register(consumer_receiver, zmq.POLLIN)
poller.register(consumer_command, zmq.POLLIN)

while True:
    events = dict(poller.poll(100))
    if consumer_command in events:
        received = consumer_command.recv_json()
        command = received["command"]
        print("received command : ", command)

    if consumer_receiver in events:
        received = consumer_receiver.recv_json()
        print("received data", received)

接收器输出:

received data {'data': ['Hi'], 'id': 0}
received command :  map   
received data {'data': ['hi'], 'id': 1}
...

我想要:

received data {'data': ['Hi'], 'id': 0}
received data {'data': ['hi'], 'id': 1}
...
received command :  map   

我尝试将 PUSH 套接字的 HWM 设置为 1,但没有成功。

PUSH完成后如何向所有worker发送同步消息?

您正在为命令和数据使用单独的流 - 这将始终保证同步问题。在接收方,您将有两个流缓冲区 - 第一个有大量数据要处理,第二个只有命令和 poll() 将确保通知您两个都已准备好被读取。

我看到有两种方法可以解决这个问题:

1) 保持简单:只使用一个流。你在最后发送的所有内容都会在最后收到。 TCP 保证这一点。如果您使用的是 json,您只需添加 'type': 'command' 或 'type': 'data' 来区分消息类型。

2) 如果出于某种原因,你真的需要两个流(例如,你真的想玩 publisher/subscriber 模式),接收方应该在发送方可以接收到最后一批数据之前向发送方确认发送它的命令。如果 all 工作人员需要在使用命令启动其中 any 之前接收数据,则也可以选择此选项。

您正在寻求实施障碍。

ZeroMQ完全是Actor模型编程,一个特点是发送和接收消息没有隐含的显式会合。也就是说,无论对方是否已阅读消息,发送都会return。

所以这意味着必须在 ZeroMQ 的 Actor 模型之上合成屏障(一种会合)。

  1. 使用 PUSH / PULL 套接字对将数据发送给工作人员。
  2. 使用单独的 PUSH / PULL 套接字对让工作人员向生产者发回 "I have the data and am ready to proceed" 消息。
  3. 让生产者等待这些 "I can proceed" 消息,
  4. 当它从每个工人那里收到一个时,在 PUB / SUB 套接字上向工人发送 "go" 消息。

通信顺序进程

出于兴趣,您可能希望将 Actor 模型编程与通信顺序进程进行比较(在 Rust、Erlang 和(我认为?)Go 中正在卷土重来)。在 CSP 中发送/接收消息是一个会合。这有几个好处;

  • 发件人知道邮件已收到,而不仅仅是排队,
  • 如果一个人有性能和延迟目标,它会迫使一个人正确地解决架构和资源分配问题。您无法隐藏传输中的消息。因此,如果没有提供足够的工作人员,生产者显然无法卸载消息;不能通过增加延迟暂时隐藏缺陷。
  • 如果你设法构建了一个可以死锁、活锁等的架构,它总是会的。而 Actor 模型架构可能看起来非常好多年,直到有一天网络变得有点繁忙。

要使用 CSP 做您想做的事,您可以省略上面的步骤 2 和 3。 Producer 在发送给最后一个 worker returned 时会知道每个 worker 都收到了它的数据,并且 "go" 可以立即发送出去。

就我个人而言,我真的希望 ZeroMQ 可以选择成为 CSP,而不是 Actor。然后它会很棒,而不是非常巨大。它真正优秀的地方在于它是 tcp、ipc、inproc 等都无关紧要。它们的行为都是一样的(速度明显不同)。

AFAIK Rust、Erlang 和 Go CSP 频道仅比过程更进一步。 ZMQ 可以是 inter and/or intra process and/or inter computer,这使得它非常适合开发可能超过一台计算机的系统。需要将线程卸载到另一台计算机?更改连接字符串,无需更改其他代码。很不错