ZeroMQ 订阅者未通过 inproc 从发布者接收消息:传输 class
ZeroMQ Subscribers not receiving message from Publisher over an inproc: transport class
我是 pyzmq
的新手。我试图理解 inproc:
传输 class 并创建了这个示例示例来玩。
看起来 Publisher
实例正在发布消息,但 Subscriber
实例是 没有收到任何。
如果我将 Subscriber
个实例移动到单独的 process
并将 inproc:
更改为 tcp:
传输 class,示例有效。
代码如下:
import threading
import time
import zmq
context = zmq.Context.instance()
address = 'inproc://test'
class Publisher(threading.Thread):
def __init__(self):
self.socket = context.socket(zmq.PUB)
self.socket.bind(address)
def run(self):
while True:
message = 'snapshot,current_time_%s' % str(time.time())
print 'sending message %s' % message
self.socket.send(message)
time.sleep(1)
class Subscriber(object):
def __init__(self, sub_name):
self.name = sub_name
self.socket = context.socket(zmq.SUB)
self.socket.connect(address)
def listen(self):
while True:
try:
msg = self.socket.recv()
a, b = msg.split(' ', 1)
print 'Received message -> %s-%s-%s' % (self.name, a, b)
except zmq.ZMQError as e:
logger.exception(e)
if __name__ == '__main__':
thread_a = []
for i in range(0, 1):
subs = Subscriber('subscriber_%s' % str(i))
th = threading.Thread(target=subs.listen)
thread_a.append(th)
th.start()
pub = Publisher()
pub_th = threading.Thread(target=pub.run)
pub_th.start()
没有错,但是
ZeroMQ 是一个很棒的工具箱。
它在引擎盖下充满了智能、明亮和自适应的服务,在许多方面确实拯救了我们可怜的生活。
仍然值得阅读并遵守文档中的一些规则。
inproc
运输 class 有一个这样的。 .bind()
先,.connect()
-s
之前
[ Page 38, Code Connected, Volume I ]
... inproc
is an inter-thread signalling transport ... it is faster than tcp
or ipc
. This transport has a specific limitation compared to tpc
and icp
: the server must issue a bind
before any client issues a connect
. This is something future versions of ØMQ may fix, but at present this defines how you use inproc
sockets.
所以,举个例子:
if __name__ == '__main__':
pub = Publisher()
pub_th = threading.Thread( target = pub.run )
pub_th.start()
# give it a place to start before .connect()-s may take place
# give it a time to start before .connect()-s may take place
sleep(0.5)
thread_a = []
for i in range( 0, 1 ):
subs = Subscriber( 'subscriber_%s' % str( i ) )
th = threading.Thread( target = subs.listen )
thread_a.append( th )
th.start()
我是 pyzmq
的新手。我试图理解 inproc:
传输 class 并创建了这个示例示例来玩。
看起来 Publisher
实例正在发布消息,但 Subscriber
实例是 没有收到任何。
如果我将 Subscriber
个实例移动到单独的 process
并将 inproc:
更改为 tcp:
传输 class,示例有效。
代码如下:
import threading
import time
import zmq
context = zmq.Context.instance()
address = 'inproc://test'
class Publisher(threading.Thread):
def __init__(self):
self.socket = context.socket(zmq.PUB)
self.socket.bind(address)
def run(self):
while True:
message = 'snapshot,current_time_%s' % str(time.time())
print 'sending message %s' % message
self.socket.send(message)
time.sleep(1)
class Subscriber(object):
def __init__(self, sub_name):
self.name = sub_name
self.socket = context.socket(zmq.SUB)
self.socket.connect(address)
def listen(self):
while True:
try:
msg = self.socket.recv()
a, b = msg.split(' ', 1)
print 'Received message -> %s-%s-%s' % (self.name, a, b)
except zmq.ZMQError as e:
logger.exception(e)
if __name__ == '__main__':
thread_a = []
for i in range(0, 1):
subs = Subscriber('subscriber_%s' % str(i))
th = threading.Thread(target=subs.listen)
thread_a.append(th)
th.start()
pub = Publisher()
pub_th = threading.Thread(target=pub.run)
pub_th.start()
没有错,但是
ZeroMQ 是一个很棒的工具箱。
它在引擎盖下充满了智能、明亮和自适应的服务,在许多方面确实拯救了我们可怜的生活。
仍然值得阅读并遵守文档中的一些规则。
inproc
运输 class 有一个这样的。 .bind()
先,.connect()
-s
[ Page 38, Code Connected, Volume I ]
...inproc
is an inter-thread signalling transport ... it is faster thantcp
oripc
. This transport has a specific limitation compared totpc
andicp
: the server must issue abind
before any client issues aconnect
. This is something future versions of ØMQ may fix, but at present this defines how you useinproc
sockets.
所以,举个例子:
if __name__ == '__main__':
pub = Publisher()
pub_th = threading.Thread( target = pub.run )
pub_th.start()
# give it a place to start before .connect()-s may take place
# give it a time to start before .connect()-s may take place
sleep(0.5)
thread_a = []
for i in range( 0, 1 ):
subs = Subscriber( 'subscriber_%s' % str( i ) )
th = threading.Thread( target = subs.listen )
thread_a.append( th )
th.start()