pyzmq 发布者可以从 class 个实例运行吗?
Can pyzmq publishers be operated from class instances?
我有以下发布者代码,它实例化了几个 class 实例并发布了一些消息。
但是,我在订阅者端没有收到任何东西。
出版商
import zmq
import time
from multiprocessing import Process
class SendData:
def __init__(self, msg, port):
self.msg = msg
self.port = port
ctx = zmq.Context()
self.sock = ctx.socket(zmq.PUB)
self.sock.bind('tcp://127.0.0.1:'+str(self.port))
time.sleep(1)
def sender(self):
self.sock.send_json(self.msg)
def main():
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device:'Some random message'}
instance = SendData(device, port)
Process(target=instance.sender).start()
if __name__ == "__main__":
main()
订阅者
import zmq
ctx = zmq.Context()
recv_sock1 = ctx.socket(zmq.SUB)
recv_sock1.connect('tcp://127.0.0.1:5001')
recv_sock1.setsockopt(zmq.SUBSCRIBE, '')
recv_sock2 = ctx.socket(zmq.SUB)
recv_sock2.connect('tcp://127.0.0.1:5002')
recv_sock2.setsockopt(zmq.SUBSCRIBE, '')
while True:
if recv_sock1.poll(10):
msg = recv_sock1.recv_json()
print msg
if recv_sock2.poll(10):
msg = recv_sock2.recv_json()
print msg
在发布者可以发布任何内容之前,我已经开始订阅。此外,我可以看到 TCP 连接已建立,因此已建立连接。
- pyzmq 版本 16.0.0
- python 版本 2.7
Q1: class 个实例是否支持 0mq 发布者?
Q2:我错过了什么?
A1:是的,他们是。
A2:使用范围的冲突v/s零共享,ZeroMQ 格言之一
一旦您的原始 Publisher 代码在 main()
中执行,class 实例化进程创建(即在 main()
-进程中使用范围),通过 .__init__()
构造函数方法,它是自己的 Context()
-实例,因此属于(包括它的所有派生子 -对象(套接字等))到这个 main()
-process.
接下来,调用 Process(...)
启动另外几个进程,接收 class-instances(遗憾的是这些已经创建ZeroMQ 不可共享玩具 ) 来自 main()
-scope-of-use.
解决方案?
一个可能的肮脏的快速 hack 可能是推迟 ZeroMQ Context()
实例化——是的,只需将它从 .__init__()
移动到某个 [=35] =].aDeferredSETUP()
将在每个 spinned-of Process()
-process 中的不同使用范围下具体执行,不同于 main()
-process 和你应该这样做,因为零共享是安全遵守的。
class SendData:
def __init__(self, msg, port):
self.msg = msg
self.port = port
self.NotSETUP = True
self.ctx = None
self.sock = None
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R
# ctx = zmq.Context()
# self.sock = ctx.socket( zmq.PUB )
# self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
# time.sleep( 1 )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R
def sender( self ):
if self.NotSETUP:
self.aDeferredSETUP()
self.sock.send_json( self.msg )
def aDeferredSETUP( self ): # create I/O-threads in Process(), not main()
self.ctx = zmq.Context()
self.sock = self.ctx.socket( zmq.PUB )
self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
time.sleep( 1 )
如前所述,尝试在进程之间共享 ZeroMQ 上下文是这里的问题,user3666197 的解决方案将起作用。
但是,在这种情况下,我建议继承 multiprocessing.Process
。这样一来,代码的哪一部分在哪个进程中执行就清楚多了。它还使您的代码更具可读性和可重用性。
以下代码为每个设备创建一个发送进程。可以在程序运行期间重用发送进程以发送更多数据:
import multiprocessing
import queue
import zmq
import time
class Sender(multiprocessing.Process):
def __init__(self, port):
super(Sender, self).__init__()
self._port = port
self._messages = multiprocessing.Queue()
self._do_stop = multiprocesing.Event()
def run(self):
"""
This code is executed in a new process.
"""
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:" + str(self._port))
while not self._do_stop.is_set():
try:
msg = self._message.get_nowait()
sock.send_json(msg)
except queue.Empty:
time.sleep(0.01)
def stop(self):
self._do_stop.set()
def send_message(self, msg):
self._messages.put(msg)
def main():
data = zip(['2.2.2.2', '5.5.5.5'], [5001, 5002])
# create senders
senders = {device: Sender(port) for device, port in data}
# start senders
for device in senders:
senders[device].start()
# send messages
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device: 'Some random message'}
senders[device].send_message(msg)
# do more stuff here....
# ... e.g. send more messages
# ...
# once you are finished, stop the subprocesses
for device in senders:
senders[device].stop()
希望对您的问题有所帮助。
我有以下发布者代码,它实例化了几个 class 实例并发布了一些消息。
但是,我在订阅者端没有收到任何东西。
出版商
import zmq
import time
from multiprocessing import Process
class SendData:
def __init__(self, msg, port):
self.msg = msg
self.port = port
ctx = zmq.Context()
self.sock = ctx.socket(zmq.PUB)
self.sock.bind('tcp://127.0.0.1:'+str(self.port))
time.sleep(1)
def sender(self):
self.sock.send_json(self.msg)
def main():
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device:'Some random message'}
instance = SendData(device, port)
Process(target=instance.sender).start()
if __name__ == "__main__":
main()
订阅者
import zmq
ctx = zmq.Context()
recv_sock1 = ctx.socket(zmq.SUB)
recv_sock1.connect('tcp://127.0.0.1:5001')
recv_sock1.setsockopt(zmq.SUBSCRIBE, '')
recv_sock2 = ctx.socket(zmq.SUB)
recv_sock2.connect('tcp://127.0.0.1:5002')
recv_sock2.setsockopt(zmq.SUBSCRIBE, '')
while True:
if recv_sock1.poll(10):
msg = recv_sock1.recv_json()
print msg
if recv_sock2.poll(10):
msg = recv_sock2.recv_json()
print msg
在发布者可以发布任何内容之前,我已经开始订阅。此外,我可以看到 TCP 连接已建立,因此已建立连接。
- pyzmq 版本 16.0.0
- python 版本 2.7
Q1: class 个实例是否支持 0mq 发布者?
Q2:我错过了什么?
A1:是的,他们是。
A2:使用范围的冲突v/s零共享,ZeroMQ 格言之一
一旦您的原始 Publisher 代码在 main()
中执行,class 实例化进程创建(即在 main()
-进程中使用范围),通过 .__init__()
构造函数方法,它是自己的 Context()
-实例,因此属于(包括它的所有派生子 -对象(套接字等))到这个 main()
-process.
接下来,调用 Process(...)
启动另外几个进程,接收 class-instances(遗憾的是这些已经创建ZeroMQ 不可共享玩具 ) 来自 main()
-scope-of-use.
解决方案?
一个可能的肮脏的快速 hack 可能是推迟 ZeroMQ Context()
实例化——是的,只需将它从 .__init__()
移动到某个 [=35] =].aDeferredSETUP()
将在每个 spinned-of Process()
-process 中的不同使用范围下具体执行,不同于 main()
-process 和你应该这样做,因为零共享是安全遵守的。
class SendData:
def __init__(self, msg, port):
self.msg = msg
self.port = port
self.NotSETUP = True
self.ctx = None
self.sock = None
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R
# ctx = zmq.Context()
# self.sock = ctx.socket( zmq.PUB )
# self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
# time.sleep( 1 )
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ L8R
def sender( self ):
if self.NotSETUP:
self.aDeferredSETUP()
self.sock.send_json( self.msg )
def aDeferredSETUP( self ): # create I/O-threads in Process(), not main()
self.ctx = zmq.Context()
self.sock = self.ctx.socket( zmq.PUB )
self.sock.bind( 'tcp://127.0.0.1:' + str( self.port ) )
time.sleep( 1 )
如前所述,尝试在进程之间共享 ZeroMQ 上下文是这里的问题,user3666197 的解决方案将起作用。
但是,在这种情况下,我建议继承 multiprocessing.Process
。这样一来,代码的哪一部分在哪个进程中执行就清楚多了。它还使您的代码更具可读性和可重用性。
以下代码为每个设备创建一个发送进程。可以在程序运行期间重用发送进程以发送更多数据:
import multiprocessing
import queue
import zmq
import time
class Sender(multiprocessing.Process):
def __init__(self, port):
super(Sender, self).__init__()
self._port = port
self._messages = multiprocessing.Queue()
self._do_stop = multiprocesing.Event()
def run(self):
"""
This code is executed in a new process.
"""
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:" + str(self._port))
while not self._do_stop.is_set():
try:
msg = self._message.get_nowait()
sock.send_json(msg)
except queue.Empty:
time.sleep(0.01)
def stop(self):
self._do_stop.set()
def send_message(self, msg):
self._messages.put(msg)
def main():
data = zip(['2.2.2.2', '5.5.5.5'], [5001, 5002])
# create senders
senders = {device: Sender(port) for device, port in data}
# start senders
for device in senders:
senders[device].start()
# send messages
for device, port in zip(['2.2.2.2', '5.5.5.5'],[5001, 5002]):
msg = {device: 'Some random message'}
senders[device].send_message(msg)
# do more stuff here....
# ... e.g. send more messages
# ...
# once you are finished, stop the subprocesses
for device in senders:
senders[device].stop()
希望对您的问题有所帮助。