ZMQ 对(用于发信号)由于连接不良而阻塞

ZMQ pair (for signaling) is blocking because of bad connection

我有两个线程。一个是 Worker Thread,另一个是 Communication Thread.

Worker Thread 正在从串口读取数据,进行一些处理,然后将结果排队发送到服务器。

Communication Tthread 正在从队列中读取结果并发送。挑战在于连接是无线的,虽然通常存在,但它可能时断时续(几分钟内进出范围),如果我失去连接,我不想阻止 Worker Thread

我为此选择的模式如下:

Worker Thread 有一个 enqueue 方法,该方法将消息添加到 Queue,然后使用 zmq.PAIRinproc://signal 发送信号。

Communication Thread 使用 zmq.DEALER 与服务器通信(zmq.ROUTER),但轮询 inproc://signal 对以注册是否有新消息需要发不发

以下是模式的简化示例:

import Queue
import zmq
import time
import threading
import simplejson


class ZmqPattern():
    def __init__(self):
        self.q_out = Queue.Queue()
        self.q_in = Queue.Queue()
        self.signal = None
        self.API_KEY = 'SOMETHINGCOMPLEX'
        self.zmq_comm_thr = None

    def start_zmq_signal(self):
        self.context = zmq.Context()

        # signal socket for waking the zmq thread to send messages to the relay
        self.signal = self.context.socket(zmq.PAIR)
        self.signal.bind("inproc://signal")

    def enqueue(self, msg):
        print("> pre-enqueue")
        self.q_out.put(msg)
        print("< post-enqueue")

        print(") send sig")
        self.signal.send(b"")
        print("( sig sent")

    def communication_thread(self, q_out):
        poll = zmq.Poller()

        self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'

        wake = self.context.socket(zmq.PAIR)
        wake.connect("inproc://signal")
        poll.register(wake, zmq.POLLIN)

        self.socket = self.context.socket(zmq.DEALER)
        self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
        self.socket.connect(self.endpoint_url)
        poll.register(self.socket, zmq.POLLIN)

        while True:
            sockets = dict(poll.poll())

            if self.socket in sockets:
                message = self.socket.recv()
                message = simplejson.loads(message)

                # Incomming messages which need to be handled on the worker thread
                self.q_in.put(message)

            if wake in sockets:
                wake.recv()
                while not q_out.empty():
                    print(">> Popping off Queue")
                    message = q_out.get()
                    print(">>> Popped off Queue")
                    message = simplejson.dumps(message)
                    print("<<< About to be sent")
                    self.socket.send(message)
                    print("<< Sent")

    def start(self):
        self.start_zmq_signal()
        # ZMQ Thread
        self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
        self.zmq_comm_thr.daemon = True
        self.zmq_comm_thr.name = "ZMQ Thread"
        self.zmq_comm_thr.start()


if __name__ == '__main__':
    test = ZmqPattern()
    test.start()

    print '###############################################'
    print '############## Starting comms #################'
    print "###############################################"

    last_debug = time.time()
    test_msg = {}
    for c in xrange(1000):
        key = 'something{}'.format(c)
        val = 'important{}'.format(c)
        test_msg[key] = val

    while True:
        test.enqueue(test_msg)
        if time.time() - last_debug > 1:
            last_debug = time.time()
            print "Still alive..."

如果你 运行 这个,你会看到经销商阻塞,因为另一端没有路由器,不久之后,由于 Communication Thread 没有收到

我应该如何最好地设置 inproc zmq 以不阻塞 Worker Thread

仅供参考,整个系统最多需要缓冲 20 万条消息,每条消息大约 256 字节。

经销商套接字对其存储的消息数量有限制,称为高水位线。在您的经销商套接字创建下方,尝试:

    self.socket = self.context.socket(zmq.DEALER)
    self.socket.setsockopt(zmq.SNDHWM, 200000)

并把这个数字设置得尽可能高;限制是您机器的内存。

编辑:

这个问题中关于高水位标记的一些很好的讨论:

Majordomo broker: handling large number of connections