PyZMQ 订阅者在使用请求套接字时不接收消息

PyZMQ Subscriber doesn't receive messages when working with request socket

我正在使用 PyZMQ,我遇到了一个似乎相当特殊的问题。我有两个 classes 正在包装用于通信的套接字,MZLSubscriberMZLRequester。有一个 class 包含了它们,MZLLink。对于其中的每一个,我还进行了测试 MZLSubscriberTestMZLRequesterTestMZLinkTest。订阅者和请求者的测试按预期工作,但 MZLinkTest 没有收到任何订阅者消息。

下面似乎是相关代码,它们是 3 个 class 的构造函数以及 MZLSubscriberrun() 和 [=26= 的测试] 和 MZLSubscriber.

MZLink构造函数:

    # Host information
    self.host = host
    self.requestPort = requestPort
    self.subscriberPort = subscriberPort

    # Set up zmq context
    self.zmq_context = zmq.Context()

    # Set up subscriber and replier
    self.subscriber = MZLSubscriber(self.zmq_context, self.host, self.subscriberPort)
    self.requester = MZLRequester(self.zmq_context, self.host, self.requestPort)

    # Start subscriber
    self.subscriber.start()

MZLink 测试:

# Constants
HOST = "localhost"
REQ_PORT = 5555
SUB_PORT = 5556

# Create Link
link = MIDASZMQLink(HOST, REQ_PORT, SUB_PORT)

link.close()

MZLRequester构造函数:

    # Initialize class member variables
    self.zmq_context = zmq_context
    self.host = host
    self.port = port

    # Set up reply socket
    self.socket = self.zmq_context.socket(zmq.REQ)

    # Connect socket
    self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))

MZLSubscriber构造函数:

   # Initialize parent process
    Process.__init__(self)

    # Store zmq context and connection host/port
    self.zmq_context = zmq_context
    self.host = host
    self.port = port

    # Sockets. Don't set them up here because sockets are not thread safe
    self.socket = None

    # Queue to store data in
    # TODO: Make queue not overflow if events come in too quickly
    self.queue = Queue()

MZLSubscriber.run():

    # Parent call
    Process.run(self)

    # Set up subscriber socket in this thread
    self.socket = self.zmq_context.socket(zmq.SUB)
    self.socket.setsockopt_string(zmq.SUBSCRIBE, unicode())

    # Connect socket
    self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))

    # While the thread is alive, poll for data to put into queue
    # Calling MZLSubscriber.stop() will automatically change this
    while self.is_alive():
        datum = self.socket.recv()
        self.queue.put(datum)

    # Disconnect and close socket.
    #FIXME: Doesn't get here because terminate() immediately stops the process
    self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port))
    self.socket.close()

MZLSubscriber 测试:

# Host information
HOST = "localhost"
SUBSCRIBER_PORT = "5556"

# Set up zmq context
zmq_context = zmq.Context()

# Set up subscriber
subscriber = MZLSubscriber(zmq_context, HOST, SUBSCRIBER_PORT)

# Start subscriber
subscriber.start()

# Stop and join subscriber
subscriber.close()
subscriber.join()

订户线程似乎在 datum = self.socket.recv() 阻塞,这让我认为这可能是套接字创建的一些问题。但是,它似乎仅在与订户合作时才有效。请求者似乎在这两种情况下都有效。此外,只需注释掉处理 requester.

的两行,一切就会顺利进行

对于代码墙,我深表歉意,但目前我什至无法缩小问题出在哪些代码范围内。当我这样做时,我会删除不相关的代码。处理传入数据的测试代码已被删除。

澄清一下,我使用的是 Python 2.7 和 PyZMQ 14.3.1。

更新: 似乎 运行 MZLSubscriber 在主线程中而不是创建另一个 Process 导致预期的结果,所以看起来这可能是某种线程安全。据我所知,zmq 上下文是线程安全的,但套接字不是。我认为这不会引起问题,因为我明确确保每个线程都有一个套接字。

更新 2: 如果在 MZLSubscriber 中设置套接字的调用从 run() 移动到 __init__,则套接字似乎收到一小部分已发布的消息,但确实有错误:

Process MZLSubscriber-1:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Users/user/Repos/midas-client/client/midasclient/mzlSubscriber.py", line 45, in run
    datum = self.socket.recv()
  File "socket.pyx", line 628, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5616)
  File "socket.pyx", line 662, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5436)
  File "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)
  File "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032)
ZMQError: Interrupted system call

我已经通过在 MZLSubscriber.run() 中创建一个新的 zmq.Context 来解决这个问题,尽管我觉得如果 zmq 上下文是线程安全的,这就没有必要了。

看来我的问题是在不同进程上使用多个 zmq 上下文。虽然 PyZMQ 文档指出 zmq 上下文是线程安全的,但我只能假设它意味着 Python 个线程而不是进程。这非常令人困惑,因为在 C 中,zmq 上下文是线程安全的,尽管 运行 类似于 Python multiprocessing.Process.

问题已通过为每个进程创建 zmq 上下文解决。