ZMQ 对(用于发信号)由于连接不良而阻塞
ZMQ pair (for signaling) is blocking because of bad connection
我有两个线程。一个是 Worker Thread
,另一个是 Communication Thread
.
Worker Thread
正在从串口读取数据,进行一些处理,然后将结果排队发送到服务器。
Communication Tthread
正在从队列中读取结果并发送。挑战在于连接是无线的,虽然通常存在,但它可能时断时续(几分钟内进出范围),如果我失去连接,我不想阻止 Worker Thread
。
我为此选择的模式如下:
Worker Thread
有一个 enqueue
方法,该方法将消息添加到 Queue
,然后使用 zmq.PAIR
向 inproc://signal
发送信号。
Communication Thread
使用 zmq.DEALER
与服务器通信(zmq.ROUTER
),但轮询 inproc://signal
对以注册是否有新消息需要发不发
以下是模式的简化示例:
import Queue
import zmq
import time
import threading
import simplejson
class ZmqPattern():
def __init__(self):
self.q_out = Queue.Queue()
self.q_in = Queue.Queue()
self.signal = None
self.API_KEY = 'SOMETHINGCOMPLEX'
self.zmq_comm_thr = None
def start_zmq_signal(self):
self.context = zmq.Context()
# signal socket for waking the zmq thread to send messages to the relay
self.signal = self.context.socket(zmq.PAIR)
self.signal.bind("inproc://signal")
def enqueue(self, msg):
print("> pre-enqueue")
self.q_out.put(msg)
print("< post-enqueue")
print(") send sig")
self.signal.send(b"")
print("( sig sent")
def communication_thread(self, q_out):
poll = zmq.Poller()
self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'
wake = self.context.socket(zmq.PAIR)
wake.connect("inproc://signal")
poll.register(wake, zmq.POLLIN)
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
self.socket.connect(self.endpoint_url)
poll.register(self.socket, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if self.socket in sockets:
message = self.socket.recv()
message = simplejson.loads(message)
# Incomming messages which need to be handled on the worker thread
self.q_in.put(message)
if wake in sockets:
wake.recv()
while not q_out.empty():
print(">> Popping off Queue")
message = q_out.get()
print(">>> Popped off Queue")
message = simplejson.dumps(message)
print("<<< About to be sent")
self.socket.send(message)
print("<< Sent")
def start(self):
self.start_zmq_signal()
# ZMQ Thread
self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
self.zmq_comm_thr.daemon = True
self.zmq_comm_thr.name = "ZMQ Thread"
self.zmq_comm_thr.start()
if __name__ == '__main__':
test = ZmqPattern()
test.start()
print '###############################################'
print '############## Starting comms #################'
print "###############################################"
last_debug = time.time()
test_msg = {}
for c in xrange(1000):
key = 'something{}'.format(c)
val = 'important{}'.format(c)
test_msg[key] = val
while True:
test.enqueue(test_msg)
if time.time() - last_debug > 1:
last_debug = time.time()
print "Still alive..."
如果你 运行 这个,你会看到经销商阻塞,因为另一端没有路由器,不久之后,由于 Communication Thread
没有收到
我应该如何最好地设置 inproc zmq 以不阻塞 Worker Thread
。
仅供参考,整个系统最多需要缓冲 20 万条消息,每条消息大约 256 字节。
经销商套接字对其存储的消息数量有限制,称为高水位线。在您的经销商套接字创建下方,尝试:
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.SNDHWM, 200000)
并把这个数字设置得尽可能高;限制是您机器的内存。
编辑:
这个问题中关于高水位标记的一些很好的讨论:
Majordomo broker: handling large number of connections
我有两个线程。一个是 Worker Thread
,另一个是 Communication Thread
.
Worker Thread
正在从串口读取数据,进行一些处理,然后将结果排队发送到服务器。
Communication Tthread
正在从队列中读取结果并发送。挑战在于连接是无线的,虽然通常存在,但它可能时断时续(几分钟内进出范围),如果我失去连接,我不想阻止 Worker Thread
。
我为此选择的模式如下:
Worker Thread
有一个 enqueue
方法,该方法将消息添加到 Queue
,然后使用 zmq.PAIR
向 inproc://signal
发送信号。
Communication Thread
使用 zmq.DEALER
与服务器通信(zmq.ROUTER
),但轮询 inproc://signal
对以注册是否有新消息需要发不发
以下是模式的简化示例:
import Queue
import zmq
import time
import threading
import simplejson
class ZmqPattern():
def __init__(self):
self.q_out = Queue.Queue()
self.q_in = Queue.Queue()
self.signal = None
self.API_KEY = 'SOMETHINGCOMPLEX'
self.zmq_comm_thr = None
def start_zmq_signal(self):
self.context = zmq.Context()
# signal socket for waking the zmq thread to send messages to the relay
self.signal = self.context.socket(zmq.PAIR)
self.signal.bind("inproc://signal")
def enqueue(self, msg):
print("> pre-enqueue")
self.q_out.put(msg)
print("< post-enqueue")
print(") send sig")
self.signal.send(b"")
print("( sig sent")
def communication_thread(self, q_out):
poll = zmq.Poller()
self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'
wake = self.context.socket(zmq.PAIR)
wake.connect("inproc://signal")
poll.register(wake, zmq.POLLIN)
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
self.socket.connect(self.endpoint_url)
poll.register(self.socket, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if self.socket in sockets:
message = self.socket.recv()
message = simplejson.loads(message)
# Incomming messages which need to be handled on the worker thread
self.q_in.put(message)
if wake in sockets:
wake.recv()
while not q_out.empty():
print(">> Popping off Queue")
message = q_out.get()
print(">>> Popped off Queue")
message = simplejson.dumps(message)
print("<<< About to be sent")
self.socket.send(message)
print("<< Sent")
def start(self):
self.start_zmq_signal()
# ZMQ Thread
self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
self.zmq_comm_thr.daemon = True
self.zmq_comm_thr.name = "ZMQ Thread"
self.zmq_comm_thr.start()
if __name__ == '__main__':
test = ZmqPattern()
test.start()
print '###############################################'
print '############## Starting comms #################'
print "###############################################"
last_debug = time.time()
test_msg = {}
for c in xrange(1000):
key = 'something{}'.format(c)
val = 'important{}'.format(c)
test_msg[key] = val
while True:
test.enqueue(test_msg)
if time.time() - last_debug > 1:
last_debug = time.time()
print "Still alive..."
如果你 运行 这个,你会看到经销商阻塞,因为另一端没有路由器,不久之后,由于 Communication Thread
没有收到
我应该如何最好地设置 inproc zmq 以不阻塞 Worker Thread
。
仅供参考,整个系统最多需要缓冲 20 万条消息,每条消息大约 256 字节。
经销商套接字对其存储的消息数量有限制,称为高水位线。在您的经销商套接字创建下方,尝试:
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.SNDHWM, 200000)
并把这个数字设置得尽可能高;限制是您机器的内存。
编辑:
这个问题中关于高水位标记的一些很好的讨论:
Majordomo broker: handling large number of connections