为什么我用multiprocessing.Process转运行时ZeroMQ通信失败?

Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?

请看下面的代码:


server.py

import zmq 
import time
from multiprocessing import Process
class A:
  def __init__(self):
    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.PUB)
    sock.bind('ipc://test')
    p = Process(target=A.run, args=(sock,))
    p.start()     # Process calls run, but the client can't receive messages
    p.join()      #
    #A.run(sock)  # this one is ok, messages get it to be received

  @staticmethod
  def run(sock):
    while True:
      sock.send('demo'.encode('utf-8'))
      print('sent')
      time.sleep(1)

if __name__ =='__main__':
  a = A()

client.py

import zmq 
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '') 
while True:
  print(sock.recv())

server.py的构造函数中,如果我直接调用.run()-方法,客户端可以收到消息,但是当我使用multiprocessing.Process()-方法时,就失败了。谁能解释一下并提供一些建议?

Q : "Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?"

嗯,ZeroMQ 不会通信失败,问题是,Python multiprocessing 模块如何“运行”。

该模块的设计使得某些处理可以从 python 中央 GIL-lock(re-[SERIAL]-iser,用作永远存在的 [CONCURRENT]-situations' 主要回避者).

这意味着对 multiprocessing.Process 的调用使得 python 解释器状态的一个精确的“mirror-copy”,“导出”到新的 O/S-spawned 进程中(详情取决于本地主机 O/S).

鉴于此,“镜像”副本获得 __main__ 已经拥有的资源的机会为零 - 这里 .bind() 方法已经获得 ipc://test地址,因此“远程”进程将永远不会获得“许可”来访问此 ZeroMQ 访问点,除非代码得到修复并完全 re-factored.

Q : "Can anyone explain on this and provide some advice?"

当然可以。最好的开始步骤是充分理解垄断 GIL-lock 重新 [SERIAL] 化的 Pythonic 文化,其中不会同时发生两件事,因此即使添加更多线程也可以不是 speed-up 处理流程,因为它全部由中央“垄断者”获得 re-aligned GIL-lock。

接下来,了解 python 解释器状态的完全反映副本的承诺,虽然听起来很有希望,但也有一些明显的缺点 - 作为“镜像”的新进程 - 副本不会引入冲突情况在已经拥有的资源上。如果他们尝试这样做,未按预期工作 案例是主要 ill-designed 案例中较轻微的问题。

在您的代码中,__main__ 中的第一行实例化了 a = A(),其中 A.__init__ 方法直接占据了 IPC-resource,因为 .bind('ipc://test')。后面的步骤,p = Process( target = A.run, args = ( sock, ) )“镜像”-复制 python 解释器的状态(as-is 副本)并且 p.start() 不得不崩溃以“拥有”与 __main__ 已经拥有的相同资源(是的,“镜像”进程的 ipc://test 指示调用以获取 .bind('ipc://test') 中相同的 non-free 资源)。这永远飞不起来。

最后但同样重要的是,欣赏 Zen-of-Zero,Martin SUSTRIK 为 打造的杰作,制作精良,最终可扩展,几乎 zero-latency,非常舒适,广泛移植的信号& 消息传递框架。

简短回答:启动您的子流程。在每个子进程 中的 Producer.run()-classmethod 中创建 zmq.Context- 和 .Socket- 实例。在基数为 1 的一侧使用 .bind()-方法,在基数 >1 的一侧(在本例中为“服务器”)使用 .connect()-方法。

我的方法结构类似于...

# server.py :

    import zmq
    from multiprocessing import Process

    class Producer (Process):
    
      def init(self):
        ...
    
      def run(self):
        ctx = zmq.Context(1)
        sock = zmq.Socket(ctx, zmq.PUB)
        # Multiple producers, so connect instead of bind (consumer must bind)
        sock.connect('ipc://test')
        while True:
          ...
    
    if __name__ == "__main__":
      producer = Producer()
      p = Process(target=producer.run)
      p.start()
      p.join()

# client.py :

    import zmq

    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.SUB)
    # Capture from multiple producers, so bind (producers must connect)
    sock.bind('ipc://test')
    sock.setsockopt_string(zmq.SUBSCRIBE, '') 
    while True:
      print(sock.recv())