为什么我用multiprocessing.Process转运行时ZeroMQ通信失败?
Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?
请看下面的代码:
server.py
import zmq
import time
from multiprocessing import Process
class A:
def __init__(self):
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.PUB)
sock.bind('ipc://test')
p = Process(target=A.run, args=(sock,))
p.start() # Process calls run, but the client can't receive messages
p.join() #
#A.run(sock) # this one is ok, messages get it to be received
@staticmethod
def run(sock):
while True:
sock.send('demo'.encode('utf-8'))
print('sent')
time.sleep(1)
if __name__ =='__main__':
a = A()
client.py
import zmq
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
print(sock.recv())
在server.py
的构造函数中,如果我直接调用.run()
-方法,客户端可以收到消息,但是当我使用multiprocessing.Process()
-方法时,就失败了。谁能解释一下并提供一些建议?
Q : "Why ZeroMQ fails to communicate when I use multiprocessing.Process
to run?"
嗯,ZeroMQ 不会通信失败,问题是,Python multiprocessing
模块如何“运行”。
该模块的设计使得某些处理可以从 python 中央 GIL-lock(re-[SERIAL]
-iser,用作永远存在的 [CONCURRENT]
-situations' 主要回避者).
这意味着对 multiprocessing.Process
的调用使得 python 解释器状态的一个精确的“mirror-copy”,“导出”到新的 O/S-spawned 进程中(详情取决于本地主机 O/S).
鉴于此,“镜像”副本获得 __main__
已经拥有的资源的机会为零 - 这里 .bind()
方法已经获得 ipc://test
地址,因此“远程”进程将永远不会获得“许可”来访问此 ZeroMQ 访问点,除非代码得到修复并完全 re-factored.
Q : "Can anyone explain on this and provide some advice?"
当然可以。最好的开始步骤是充分理解垄断 GIL-lock 重新 [SERIAL]
化的 Pythonic 文化,其中不会同时发生两件事,因此即使添加更多线程也可以不是 speed-up 处理流程,因为它全部由中央“垄断者”获得 re-aligned GIL-lock。
接下来,了解 python 解释器状态的完全反映副本的承诺,虽然听起来很有希望,但也有一些明显的缺点 - 作为“镜像”的新进程 - 副本不会引入冲突情况在已经拥有的资源上。如果他们尝试这样做,未按预期工作 案例是主要 ill-designed 案例中较轻微的问题。
在您的代码中,__main__
中的第一行实例化了 a = A()
,其中 A
的 .__init__
方法直接占据了 IPC-resource,因为 .bind('ipc://test')
。后面的步骤,p = Process( target = A.run, args = ( sock, ) )
“镜像”-复制 python 解释器的状态(as-is 副本)并且 p.start()
不得不崩溃以“拥有”与 __main__
已经拥有的相同资源(是的,“镜像”进程的 ipc://test
指示调用以获取 .bind('ipc://test')
中相同的 non-free 资源)。这永远飞不起来。
最后但同样重要的是,欣赏 Zen-of-Zero,Martin SUSTRIK 为 distributed-computing 打造的杰作,制作精良,最终可扩展,几乎 zero-latency,非常舒适,广泛移植的信号& 消息传递框架。
简短回答:启动您的子流程。在每个子进程 中的 Producer.run()
-classmethod 中创建 zmq.Context
- 和 .Socket
- 实例。在基数为 1 的一侧使用 .bind()
-方法,在基数 >1 的一侧(在本例中为“服务器”)使用 .connect()
-方法。
我的方法结构类似于...
# server.py :
import zmq
from multiprocessing import Process
class Producer (Process):
def init(self):
...
def run(self):
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.PUB)
# Multiple producers, so connect instead of bind (consumer must bind)
sock.connect('ipc://test')
while True:
...
if __name__ == "__main__":
producer = Producer()
p = Process(target=producer.run)
p.start()
p.join()
# client.py :
import zmq
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
# Capture from multiple producers, so bind (producers must connect)
sock.bind('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
print(sock.recv())
请看下面的代码:
server.py
import zmq
import time
from multiprocessing import Process
class A:
def __init__(self):
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.PUB)
sock.bind('ipc://test')
p = Process(target=A.run, args=(sock,))
p.start() # Process calls run, but the client can't receive messages
p.join() #
#A.run(sock) # this one is ok, messages get it to be received
@staticmethod
def run(sock):
while True:
sock.send('demo'.encode('utf-8'))
print('sent')
time.sleep(1)
if __name__ =='__main__':
a = A()
client.py
import zmq
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
print(sock.recv())
在server.py
的构造函数中,如果我直接调用.run()
-方法,客户端可以收到消息,但是当我使用multiprocessing.Process()
-方法时,就失败了。谁能解释一下并提供一些建议?
Q : "Why ZeroMQ fails to communicate when I use
multiprocessing.Process
to run?"
嗯,ZeroMQ 不会通信失败,问题是,Python multiprocessing
模块如何“运行”。
该模块的设计使得某些处理可以从 python 中央 GIL-lock(re-[SERIAL]
-iser,用作永远存在的 [CONCURRENT]
-situations' 主要回避者).
这意味着对 multiprocessing.Process
的调用使得 python 解释器状态的一个精确的“mirror-copy”,“导出”到新的 O/S-spawned 进程中(详情取决于本地主机 O/S).
鉴于此,“镜像”副本获得 __main__
已经拥有的资源的机会为零 - 这里 .bind()
方法已经获得 ipc://test
地址,因此“远程”进程将永远不会获得“许可”来访问此 ZeroMQ 访问点,除非代码得到修复并完全 re-factored.
Q : "Can anyone explain on this and provide some advice?"
当然可以。最好的开始步骤是充分理解垄断 GIL-lock 重新 [SERIAL]
化的 Pythonic 文化,其中不会同时发生两件事,因此即使添加更多线程也可以不是 speed-up 处理流程,因为它全部由中央“垄断者”获得 re-aligned GIL-lock。
接下来,了解 python 解释器状态的完全反映副本的承诺,虽然听起来很有希望,但也有一些明显的缺点 - 作为“镜像”的新进程 - 副本不会引入冲突情况在已经拥有的资源上。如果他们尝试这样做,未按预期工作 案例是主要 ill-designed 案例中较轻微的问题。
在您的代码中,__main__
中的第一行实例化了 a = A()
,其中 A
的 .__init__
方法直接占据了 IPC-resource,因为 .bind('ipc://test')
。后面的步骤,p = Process( target = A.run, args = ( sock, ) )
“镜像”-复制 python 解释器的状态(as-is 副本)并且 p.start()
不得不崩溃以“拥有”与 __main__
已经拥有的相同资源(是的,“镜像”进程的 ipc://test
指示调用以获取 .bind('ipc://test')
中相同的 non-free 资源)。这永远飞不起来。
最后但同样重要的是,欣赏 Zen-of-Zero,Martin SUSTRIK 为 distributed-computing 打造的杰作,制作精良,最终可扩展,几乎 zero-latency,非常舒适,广泛移植的信号& 消息传递框架。
简短回答:启动您的子流程。在每个子进程 中的 Producer.run()
-classmethod 中创建 zmq.Context
- 和 .Socket
- 实例。在基数为 1 的一侧使用 .bind()
-方法,在基数 >1 的一侧(在本例中为“服务器”)使用 .connect()
-方法。
我的方法结构类似于...
# server.py :
import zmq
from multiprocessing import Process
class Producer (Process):
def init(self):
...
def run(self):
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.PUB)
# Multiple producers, so connect instead of bind (consumer must bind)
sock.connect('ipc://test')
while True:
...
if __name__ == "__main__":
producer = Producer()
p = Process(target=producer.run)
p.start()
p.join()
# client.py :
import zmq
ctx = zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
# Capture from multiple producers, so bind (producers must connect)
sock.bind('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
print(sock.recv())