创建线程化 zeromq 套接字的正确方法是什么?
What is correct way of creating threaded zeromq socket?
我想知道如何正确创建后台线程来监听某个随机端口并将接收到的对象推送到队列?
我希望我的套接字包装器启动新线程,select一些随机端口并开始监听。我必须能够从套接字包装器中获取此端口号。
我想出了简单的 class:
class SocketWrapper(Thread):
def __init__(self, socket_type, *args, **kwargs):
super(Thread, self).__init__(*args, **kwargs)
self._ctx = zmq.Context()
self._socket = self._ctx._socket(socket_type)
self.port = self._socket.bind_to_random_port('tcp://*')
self._queue = Queue()
def run(self):
while not self.stop_requested:
try:
item = socket.recv_pyobj(flags=zmq.NOBLOCK)
self._queue.put(item)
except ZMQError:
time.sleep(0.01) # Wait a little for next item to arrive
但是,zmq 套接字不能在线程之间共享,它们不是线程安全的 (http://api.zeromq.org/2-1:zmq)。所以socket的创建和绑定应该移到run()
方法:
class SocketWrapper2(Thread):
def __init__(self, socket_type, *args, **kwargs):
super(Thread, self).__init__(*args, **kwargs)
self._socket_type = socket_type
self._ctx = zmq.Context()
self._queue = Queue()
self._event = Event()
def run(self):
socket = self._ctx._socket(self._socket_type)
self.port = self._socket.bind_to_random_port('tcp://*')
self._event.set()
while not self.stop_requested:
try:
item = socket.recv_pyobj(flags=zmq.NOBLOCK)
self._queue.put(item)
except ZMQError:
time.sleep(0.01) # Wait a little for next item to arrive
def get_port(self):
self._event.wait()
return self.port
我必须添加事件以确保端口在我可以读取之前已经绑定,但是当在 start() 之前调用 SocketWrapper2.get_port() 时,它会引入死锁风险。这可以通过使用 Thread 的 _started 事件来避免:
def get_port(self):
if not self._started.is_set():
raise RuntimeError("You can't call run_port before thread start.")
self._event.wait()
return self.port
这终于是线程安全的了吗?还有什么需要注意的吗?
我仍然在这里看到的问题是我想在创建 SocketWrapper 后立即获取端口。我可以在 __init__
中安全地调用 Thread 的 start()
吗?
我最后稍微修改了这个解决方案以避免主线程死锁:
def get_port(self):
if not self._started.is_set():
raise RuntimeError("You can't call run_port before thread start.")
if not self._event.wait(1):
raise RuntimeError("Couldn't get port after a while.")
return self.port
这并不完美。由于我们延迟 get_port 但它很简单并且可以完成工作。有什么改进建议吗?
我想知道如何正确创建后台线程来监听某个随机端口并将接收到的对象推送到队列?
我希望我的套接字包装器启动新线程,select一些随机端口并开始监听。我必须能够从套接字包装器中获取此端口号。
我想出了简单的 class:
class SocketWrapper(Thread):
def __init__(self, socket_type, *args, **kwargs):
super(Thread, self).__init__(*args, **kwargs)
self._ctx = zmq.Context()
self._socket = self._ctx._socket(socket_type)
self.port = self._socket.bind_to_random_port('tcp://*')
self._queue = Queue()
def run(self):
while not self.stop_requested:
try:
item = socket.recv_pyobj(flags=zmq.NOBLOCK)
self._queue.put(item)
except ZMQError:
time.sleep(0.01) # Wait a little for next item to arrive
但是,zmq 套接字不能在线程之间共享,它们不是线程安全的 (http://api.zeromq.org/2-1:zmq)。所以socket的创建和绑定应该移到run()
方法:
class SocketWrapper2(Thread):
def __init__(self, socket_type, *args, **kwargs):
super(Thread, self).__init__(*args, **kwargs)
self._socket_type = socket_type
self._ctx = zmq.Context()
self._queue = Queue()
self._event = Event()
def run(self):
socket = self._ctx._socket(self._socket_type)
self.port = self._socket.bind_to_random_port('tcp://*')
self._event.set()
while not self.stop_requested:
try:
item = socket.recv_pyobj(flags=zmq.NOBLOCK)
self._queue.put(item)
except ZMQError:
time.sleep(0.01) # Wait a little for next item to arrive
def get_port(self):
self._event.wait()
return self.port
我必须添加事件以确保端口在我可以读取之前已经绑定,但是当在 start() 之前调用 SocketWrapper2.get_port() 时,它会引入死锁风险。这可以通过使用 Thread 的 _started 事件来避免:
def get_port(self):
if not self._started.is_set():
raise RuntimeError("You can't call run_port before thread start.")
self._event.wait()
return self.port
这终于是线程安全的了吗?还有什么需要注意的吗?
我仍然在这里看到的问题是我想在创建 SocketWrapper 后立即获取端口。我可以在 __init__
中安全地调用 Thread 的 start()
吗?
我最后稍微修改了这个解决方案以避免主线程死锁:
def get_port(self):
if not self._started.is_set():
raise RuntimeError("You can't call run_port before thread start.")
if not self._event.wait(1):
raise RuntimeError("Couldn't get port after a while.")
return self.port
这并不完美。由于我们延迟 get_port 但它很简单并且可以完成工作。有什么改进建议吗?