PyZMQ 订阅者在使用请求套接字时不接收消息
PyZMQ Subscriber doesn't receive messages when working with request socket
我正在使用 PyZMQ,我遇到了一个似乎相当特殊的问题。我有两个 classes 正在包装用于通信的套接字,MZLSubscriber
和 MZLRequester
。有一个 class 包含了它们,MZLLink
。对于其中的每一个,我还进行了测试 MZLSubscriberTest
、MZLRequesterTest
和 MZLinkTest
。订阅者和请求者的测试按预期工作,但 MZLinkTest
没有收到任何订阅者消息。
下面似乎是相关代码,它们是 3 个 class 的构造函数以及 MZLSubscriber
的 run()
和 [=26= 的测试] 和 MZLSubscriber
.
MZLink
构造函数:
# Host information
self.host = host
self.requestPort = requestPort
self.subscriberPort = subscriberPort
# Set up zmq context
self.zmq_context = zmq.Context()
# Set up subscriber and replier
self.subscriber = MZLSubscriber(self.zmq_context, self.host, self.subscriberPort)
self.requester = MZLRequester(self.zmq_context, self.host, self.requestPort)
# Start subscriber
self.subscriber.start()
MZLink
测试:
# Constants
HOST = "localhost"
REQ_PORT = 5555
SUB_PORT = 5556
# Create Link
link = MIDASZMQLink(HOST, REQ_PORT, SUB_PORT)
link.close()
MZLRequester
构造函数:
# Initialize class member variables
self.zmq_context = zmq_context
self.host = host
self.port = port
# Set up reply socket
self.socket = self.zmq_context.socket(zmq.REQ)
# Connect socket
self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))
MZLSubscriber
构造函数:
# Initialize parent process
Process.__init__(self)
# Store zmq context and connection host/port
self.zmq_context = zmq_context
self.host = host
self.port = port
# Sockets. Don't set them up here because sockets are not thread safe
self.socket = None
# Queue to store data in
# TODO: Make queue not overflow if events come in too quickly
self.queue = Queue()
MZLSubscriber.run()
:
# Parent call
Process.run(self)
# Set up subscriber socket in this thread
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, unicode())
# Connect socket
self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))
# While the thread is alive, poll for data to put into queue
# Calling MZLSubscriber.stop() will automatically change this
while self.is_alive():
datum = self.socket.recv()
self.queue.put(datum)
# Disconnect and close socket.
#FIXME: Doesn't get here because terminate() immediately stops the process
self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port))
self.socket.close()
MZLSubscriber
测试:
# Host information
HOST = "localhost"
SUBSCRIBER_PORT = "5556"
# Set up zmq context
zmq_context = zmq.Context()
# Set up subscriber
subscriber = MZLSubscriber(zmq_context, HOST, SUBSCRIBER_PORT)
# Start subscriber
subscriber.start()
# Stop and join subscriber
subscriber.close()
subscriber.join()
订户线程似乎在 datum = self.socket.recv()
阻塞,这让我认为这可能是套接字创建的一些问题。但是,它似乎仅在与订户合作时才有效。请求者似乎在这两种情况下都有效。此外,只需注释掉处理 requester
.
的两行,一切就会顺利进行
对于代码墙,我深表歉意,但目前我什至无法缩小问题出在哪些代码范围内。当我这样做时,我会删除不相关的代码。处理传入数据的测试代码已被删除。
澄清一下,我使用的是 Python 2.7 和 PyZMQ 14.3.1。
更新: 似乎 运行 MZLSubscriber
在主线程中而不是创建另一个 Process
导致预期的结果,所以看起来这可能是某种线程安全。据我所知,zmq 上下文是线程安全的,但套接字不是。我认为这不会引起问题,因为我明确确保每个线程都有一个套接字。
更新 2: 如果在 MZLSubscriber
中设置套接字的调用从 run()
移动到 __init__
,则套接字似乎收到一小部分已发布的消息,但确实有错误:
Process MZLSubscriber-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/Users/user/Repos/midas-client/client/midasclient/mzlSubscriber.py", line 45, in run
datum = self.socket.recv()
File "socket.pyx", line 628, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5616)
File "socket.pyx", line 662, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5436)
File "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)
File "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032)
ZMQError: Interrupted system call
我已经通过在 MZLSubscriber.run()
中创建一个新的 zmq.Context
来解决这个问题,尽管我觉得如果 zmq 上下文是线程安全的,这就没有必要了。
看来我的问题是在不同进程上使用多个 zmq 上下文。虽然 PyZMQ 文档指出 zmq 上下文是线程安全的,但我只能假设它意味着 Python 个线程而不是进程。这非常令人困惑,因为在 C 中,zmq 上下文是线程安全的,尽管 运行 类似于 Python multiprocessing.Process
.
问题已通过为每个进程创建 zmq 上下文解决。
我正在使用 PyZMQ,我遇到了一个似乎相当特殊的问题。我有两个 classes 正在包装用于通信的套接字,MZLSubscriber
和 MZLRequester
。有一个 class 包含了它们,MZLLink
。对于其中的每一个,我还进行了测试 MZLSubscriberTest
、MZLRequesterTest
和 MZLinkTest
。订阅者和请求者的测试按预期工作,但 MZLinkTest
没有收到任何订阅者消息。
下面似乎是相关代码,它们是 3 个 class 的构造函数以及 MZLSubscriber
的 run()
和 [=26= 的测试] 和 MZLSubscriber
.
MZLink
构造函数:
# Host information
self.host = host
self.requestPort = requestPort
self.subscriberPort = subscriberPort
# Set up zmq context
self.zmq_context = zmq.Context()
# Set up subscriber and replier
self.subscriber = MZLSubscriber(self.zmq_context, self.host, self.subscriberPort)
self.requester = MZLRequester(self.zmq_context, self.host, self.requestPort)
# Start subscriber
self.subscriber.start()
MZLink
测试:
# Constants
HOST = "localhost"
REQ_PORT = 5555
SUB_PORT = 5556
# Create Link
link = MIDASZMQLink(HOST, REQ_PORT, SUB_PORT)
link.close()
MZLRequester
构造函数:
# Initialize class member variables
self.zmq_context = zmq_context
self.host = host
self.port = port
# Set up reply socket
self.socket = self.zmq_context.socket(zmq.REQ)
# Connect socket
self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))
MZLSubscriber
构造函数:
# Initialize parent process
Process.__init__(self)
# Store zmq context and connection host/port
self.zmq_context = zmq_context
self.host = host
self.port = port
# Sockets. Don't set them up here because sockets are not thread safe
self.socket = None
# Queue to store data in
# TODO: Make queue not overflow if events come in too quickly
self.queue = Queue()
MZLSubscriber.run()
:
# Parent call
Process.run(self)
# Set up subscriber socket in this thread
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, unicode())
# Connect socket
self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))
# While the thread is alive, poll for data to put into queue
# Calling MZLSubscriber.stop() will automatically change this
while self.is_alive():
datum = self.socket.recv()
self.queue.put(datum)
# Disconnect and close socket.
#FIXME: Doesn't get here because terminate() immediately stops the process
self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port))
self.socket.close()
MZLSubscriber
测试:
# Host information
HOST = "localhost"
SUBSCRIBER_PORT = "5556"
# Set up zmq context
zmq_context = zmq.Context()
# Set up subscriber
subscriber = MZLSubscriber(zmq_context, HOST, SUBSCRIBER_PORT)
# Start subscriber
subscriber.start()
# Stop and join subscriber
subscriber.close()
subscriber.join()
订户线程似乎在 datum = self.socket.recv()
阻塞,这让我认为这可能是套接字创建的一些问题。但是,它似乎仅在与订户合作时才有效。请求者似乎在这两种情况下都有效。此外,只需注释掉处理 requester
.
对于代码墙,我深表歉意,但目前我什至无法缩小问题出在哪些代码范围内。当我这样做时,我会删除不相关的代码。处理传入数据的测试代码已被删除。
澄清一下,我使用的是 Python 2.7 和 PyZMQ 14.3.1。
更新: 似乎 运行 MZLSubscriber
在主线程中而不是创建另一个 Process
导致预期的结果,所以看起来这可能是某种线程安全。据我所知,zmq 上下文是线程安全的,但套接字不是。我认为这不会引起问题,因为我明确确保每个线程都有一个套接字。
更新 2: 如果在 MZLSubscriber
中设置套接字的调用从 run()
移动到 __init__
,则套接字似乎收到一小部分已发布的消息,但确实有错误:
Process MZLSubscriber-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/Users/user/Repos/midas-client/client/midasclient/mzlSubscriber.py", line 45, in run
datum = self.socket.recv()
File "socket.pyx", line 628, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5616)
File "socket.pyx", line 662, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:5436)
File "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)
File "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032)
ZMQError: Interrupted system call
我已经通过在 MZLSubscriber.run()
中创建一个新的 zmq.Context
来解决这个问题,尽管我觉得如果 zmq 上下文是线程安全的,这就没有必要了。
看来我的问题是在不同进程上使用多个 zmq 上下文。虽然 PyZMQ 文档指出 zmq 上下文是线程安全的,但我只能假设它意味着 Python 个线程而不是进程。这非常令人困惑,因为在 C 中,zmq 上下文是线程安全的,尽管 运行 类似于 Python multiprocessing.Process
.
问题已通过为每个进程创建 zmq 上下文解决。