.select() 可以在传统套接字和 ZeroMQ 套接字上使用吗?
Can .select() on both traditional and ZeroMQ sockets?
我有一个线程等待通过 ZMQ 的信号输入和通过 TCP 的网络 IO(另一个线程使用 UDP 做同样的事情)。
socket_tcp = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
socket_tcp.connect( ( self.config.tcp_ip, self.config.tcp_port ) )
while True:
s_ready = zmq.select( [socket_zmq, socket_tcp], [], [] )[0]
for sock in s_ready:
# do stuff
但是,TCP 套接字永远不会返回为可写(我确保它实际上使用 Wireshark 获取数据)。
文档说我可以通过
A zmq.Socket
or any Python object having a fileno()
method that returns a valid file descriptor.
而且我猜后者对 socket.socket
也是正确的。
UDP也一样
我是不是漏掉了什么?
我是否需要在单独的线程中处理 ZMQ 套接字并使用 ctrl_rcv, ctrl_snd = multiprocessing.Pipe()
传递其消息?
或者我可以像我期望的那样在两个世界上使用 select
吗?
不要恐慌(还)
嗯,虽然最近的 ZeroMQ API 确实提供了 ZMQ_FD
-file-descriptor 代理接触 ZeroMQ 套接字实例,这些实例只是“hidden”-iceberg 的一角,其行为方式不同于任何其他实例 "common"-套接字抽象,如已发布
最佳
至少被警告
[] 部分中的主要概念差异。
ZeroMQ 框架工具的强大之处在于其简洁的设计,因此几乎可以肯定,如果只使用 "side-by-side"(在任何 geeky syntax-constructor trick ) 与任何主要方式更简单 ( primitive, 如果有人愿意的话) 信号/消息传递设备。
如果到目前为止已经付出所有努力来实现 ZeroMQ 信号基础设施,那么不使用它就是胡说八道。
因此,宁可通过 ZeroMQ 更好地传递线程间消息传递,最好使用零拷贝智能 inproc://
传输-class 并避免任何平台-特定开销 a multiprocessing.Pipe()
将额外创建仅用于移动(...和复制)已知数据片段(ZeroMQ 可能会享受超低延迟权,因为零-复制,如适用)。
在尝试使用 asyncio 之后,我发现 zmq.asyncio
非常适合我的用例。它使用 zmq.Poller()
(zmq.select
也是如此)。但出于某种原因它只是工作...
所以我最终使用了 ZMQSelector()
和几行额外的代码:
import selectors
from zmq.asyncio import ZMQSelector
ctx = zmq.Context()
mixed_selector = ZMQSelector()
mixed_selector.register(sock_tcp, selectors.EVENT_READ)
mixed_selector.register(sock_zmq, selectors.EVENT_READ)
while True:
fd_event_list = mixed_selector.select()
for key, event in fd_event_list:
sock = key.fileobj
# do stuff
我有一个线程等待通过 ZMQ 的信号输入和通过 TCP 的网络 IO(另一个线程使用 UDP 做同样的事情)。
socket_tcp = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
socket_tcp.connect( ( self.config.tcp_ip, self.config.tcp_port ) )
while True:
s_ready = zmq.select( [socket_zmq, socket_tcp], [], [] )[0]
for sock in s_ready:
# do stuff
但是,TCP 套接字永远不会返回为可写(我确保它实际上使用 Wireshark 获取数据)。
文档说我可以通过
A
zmq.Socket
or any Python object having afileno()
method that returns a valid file descriptor.
而且我猜后者对 socket.socket
也是正确的。
UDP也一样
我是不是漏掉了什么?
我是否需要在单独的线程中处理 ZMQ 套接字并使用 ctrl_rcv, ctrl_snd = multiprocessing.Pipe()
传递其消息?
或者我可以像我期望的那样在两个世界上使用 select
吗?
不要恐慌(还)
嗯,虽然最近的 ZeroMQ API 确实提供了 ZMQ_FD
-file-descriptor 代理接触 ZeroMQ 套接字实例,这些实例只是“hidden”-iceberg 的一角,其行为方式不同于任何其他实例 "common"-套接字抽象,如已发布
最佳
至少被警告
[
ZeroMQ 框架工具的强大之处在于其简洁的设计,因此几乎可以肯定,如果只使用 "side-by-side"(在任何 geeky syntax-constructor trick ) 与任何主要方式更简单 ( primitive, 如果有人愿意的话) 信号/消息传递设备。
如果到目前为止已经付出所有努力来实现 ZeroMQ 信号基础设施,那么不使用它就是胡说八道。
因此,宁可通过 ZeroMQ 更好地传递线程间消息传递,最好使用零拷贝智能 inproc://
传输-class 并避免任何平台-特定开销 a multiprocessing.Pipe()
将额外创建仅用于移动(...和复制)已知数据片段(ZeroMQ 可能会享受超低延迟权,因为零-复制,如适用)。
在尝试使用 asyncio 之后,我发现 zmq.asyncio
非常适合我的用例。它使用 zmq.Poller()
(zmq.select
也是如此)。但出于某种原因它只是工作...
所以我最终使用了 ZMQSelector()
和几行额外的代码:
import selectors
from zmq.asyncio import ZMQSelector
ctx = zmq.Context()
mixed_selector = ZMQSelector()
mixed_selector.register(sock_tcp, selectors.EVENT_READ)
mixed_selector.register(sock_zmq, selectors.EVENT_READ)
while True:
fd_event_list = mixed_selector.select()
for key, event in fd_event_list:
sock = key.fileobj
# do stuff