如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?
How to abort context.socket.recv() the right way in ZeroMQ?
我有一个小软件,其中有一个单独的线程正在等待 ZeroMQ 消息。我正在使用 ZeroMQ 的 PUB
/SUB
通信协议。
目前我正在通过将变量“cont_loop
”设置为 False
来中止该线程。
但我发现,当没有消息到达 ZeroMQ 订阅者时,我无法退出线程(不关闭整个程序)。
def __init__(self):
Thread.__init__(self)
self.cont_loop = True
def abort(self):
self.continue_loop = False
def run(self):
zmq_context = zmq.Context()
zmq_socket = zmq_context.socket(zmq.SUB)
zmq_socket.bind("tcp://*:%s" % *(5556))
zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
while self.cont_loop:
data = zmq_socket.recv()
print "Message: " + data
zmq_socket.close()
zmq_context.term()
print "exit"
我试图将 socket.close()
和 context.term()
移动到中止方法。所以它关闭了订阅者,但这杀死了整个程序。
关闭上述程序的正确方法是什么?
问:...的正确方法是什么?
A: 有很多方法可以达到设定的目标。让我只挑一个,作为关于如何处理分布式进程到进程消息传递的模型示例。
首先。假设,在典型的软件设计任务中有更多的优先级。有的更高,有的更低,有的甚至如此之低,以至于可以推迟执行这些低优先级的子任务,这样调度程序中就有更多的时间来执行那些无法处理等待的子任务。
话虽如此,让我们查看您的代码。对 .recv()
的 SUB
侧指令被使用,导致两件事。一个可见的 - 它在具有 SUB
行为的 ZeroMQ 套接字上执行 RECEIVE 操作。第二个不太明显的是,它保持挂起状态,直到它获得具有 SUB
行为当前状态的东西 "compatible"(稍后会详细介绍)。
这意味着,自从这样的 .recv()
方法调用 UNTIL 以来,它也一直 BLOCKS 一些未知的、本地无法控制的巧合states/events 使其传递 ZeroMQ 消息,其内容为 "compatible" 并具有本地预设状态(仍在阻塞)SUB
-行为实例。
这可能需要很长时间。
这正是 .recv()
在控制循环中使用的原因,外部处理既有机会也有责任做你想做的事(包括与中止相关的操作和公平/优雅的以适当的资源释放终止。
接收过程变成了 .recv( flags = zmq.NOBLOCK )
而不是 try: except:
情节。这样一来,您的本地进程就不会失去对事件流的控制(包括 NOP 就是这样的)。
最好的下一步是什么?
花点时间阅读一本伟大的珍宝书,“Code Connected,第 1 卷”,ZeroMQ 之父 Pieter HINTJENS 已出版(也作为PDF).
他与我们分享的许多想法和需要避免的错误确实值得您花时间。
享受 ZeroMQ 的强大功能。它非常强大,值得自上而下掌握。
我有一个小软件,其中有一个单独的线程正在等待 ZeroMQ 消息。我正在使用 ZeroMQ 的 PUB
/SUB
通信协议。
目前我正在通过将变量“cont_loop
”设置为 False
来中止该线程。
但我发现,当没有消息到达 ZeroMQ 订阅者时,我无法退出线程(不关闭整个程序)。
def __init__(self):
Thread.__init__(self)
self.cont_loop = True
def abort(self):
self.continue_loop = False
def run(self):
zmq_context = zmq.Context()
zmq_socket = zmq_context.socket(zmq.SUB)
zmq_socket.bind("tcp://*:%s" % *(5556))
zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
while self.cont_loop:
data = zmq_socket.recv()
print "Message: " + data
zmq_socket.close()
zmq_context.term()
print "exit"
我试图将 socket.close()
和 context.term()
移动到中止方法。所以它关闭了订阅者,但这杀死了整个程序。
关闭上述程序的正确方法是什么?
问:...的正确方法是什么?
A: 有很多方法可以达到设定的目标。让我只挑一个,作为关于如何处理分布式进程到进程消息传递的模型示例。
首先。假设,在典型的软件设计任务中有更多的优先级。有的更高,有的更低,有的甚至如此之低,以至于可以推迟执行这些低优先级的子任务,这样调度程序中就有更多的时间来执行那些无法处理等待的子任务。
话虽如此,让我们查看您的代码。对 .recv()
的 SUB
侧指令被使用,导致两件事。一个可见的 - 它在具有 SUB
行为的 ZeroMQ 套接字上执行 RECEIVE 操作。第二个不太明显的是,它保持挂起状态,直到它获得具有 SUB
行为当前状态的东西 "compatible"(稍后会详细介绍)。
这意味着,自从这样的 .recv()
方法调用 UNTIL 以来,它也一直 BLOCKS 一些未知的、本地无法控制的巧合states/events 使其传递 ZeroMQ 消息,其内容为 "compatible" 并具有本地预设状态(仍在阻塞)SUB
-行为实例。
这可能需要很长时间。
这正是 .recv()
在控制循环中使用的原因,外部处理既有机会也有责任做你想做的事(包括与中止相关的操作和公平/优雅的以适当的资源释放终止。
接收过程变成了 .recv( flags = zmq.NOBLOCK )
而不是 try: except:
情节。这样一来,您的本地进程就不会失去对事件流的控制(包括 NOP 就是这样的)。
最好的下一步是什么?
花点时间阅读一本伟大的珍宝书,“Code Connected,第 1 卷”,ZeroMQ 之父 Pieter HINTJENS 已出版(也作为PDF).
他与我们分享的许多想法和需要避免的错误确实值得您花时间。
享受 ZeroMQ 的强大功能。它非常强大,值得自上而下掌握。