如何用PUB/SUB来防止buffering/latency?
How to prevent buffering/latency with PUB/SUB?
我正在将视频作为一系列图像(等于 zmq 消息)发送,但有时,也许当网络速度较慢时,接收它们的速度比发送它们的速度慢,并且出现越来越长的延迟,似乎是到大约一分钟的视频或数百张图像或数兆字节的数据。它通常最终会自行清除,订阅者接收消息的速度比发布者发送消息的速度更快。
相反,如果订阅者 recv
发送它们太慢,我希望它以与预期相同的方式丢弃错过的消息。我希望 zmq.CONFLATE
=1 会这样做,但事实并非如此。那么如何?我怀疑它们在发布者处被缓冲,发布者不应该有任何 zmq 缓冲区,或者以某种方式在网络堆栈中。
简化的服务器代码
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
stream.truncate()
stream.seek(0)
socket.send(stream.read())
stream.seek(0)
简化客户端代码
# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket.setsockopt(zmq.SUBSCRIBE, b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")
def get_image(self):
# Receive the latest image
poll_result = self.video_socket.poll(timeout=0)
if poll_result == zmq.POLLIN:
return self.video_socket.recv()
else:
return None
发布者在 Raspberry Pi,订阅者在 Windows。
我不确定您使用的 python zmq 是哪个版本,但基于底层的 c++ libzmq,您需要:
- 在服务器套接字上设置 ZMQ_SNDHWM 套接字选项
- 在客户端套接字上设置 ZMQ_RCVHWM 套接字选项。
在 pub/sub 的情况下,这些选项限制每个已完成连接排队的消息数。如果队列增长大于 HWM(高水位线),消息将被丢弃。
同时关闭合并,因为这会干扰这些选项。
同时在服务器上设置 zmq.CONFLATE
=1 以仅保留发送队列中的最新邮件。
绑定服务器套接字之前
socket.setsockopt(zmq.CONFLATE, 1)
出于某种原因,我误以为 PUB 套接字没有发送队列,但它有。
我正在将视频作为一系列图像(等于 zmq 消息)发送,但有时,也许当网络速度较慢时,接收它们的速度比发送它们的速度慢,并且出现越来越长的延迟,似乎是到大约一分钟的视频或数百张图像或数兆字节的数据。它通常最终会自行清除,订阅者接收消息的速度比发布者发送消息的速度更快。
相反,如果订阅者 recv
发送它们太慢,我希望它以与预期相同的方式丢弃错过的消息。我希望 zmq.CONFLATE
=1 会这样做,但事实并非如此。那么如何?我怀疑它们在发布者处被缓冲,发布者不应该有任何 zmq 缓冲区,或者以某种方式在网络堆栈中。
简化的服务器代码
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
stream.truncate()
stream.seek(0)
socket.send(stream.read())
stream.seek(0)
简化客户端代码
# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket.setsockopt(zmq.SUBSCRIBE, b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")
def get_image(self):
# Receive the latest image
poll_result = self.video_socket.poll(timeout=0)
if poll_result == zmq.POLLIN:
return self.video_socket.recv()
else:
return None
发布者在 Raspberry Pi,订阅者在 Windows。
我不确定您使用的 python zmq 是哪个版本,但基于底层的 c++ libzmq,您需要:
- 在服务器套接字上设置 ZMQ_SNDHWM 套接字选项
- 在客户端套接字上设置 ZMQ_RCVHWM 套接字选项。
在 pub/sub 的情况下,这些选项限制每个已完成连接排队的消息数。如果队列增长大于 HWM(高水位线),消息将被丢弃。
同时关闭合并,因为这会干扰这些选项。
同时在服务器上设置 zmq.CONFLATE
=1 以仅保留发送队列中的最新邮件。
绑定服务器套接字之前
socket.setsockopt(zmq.CONFLATE, 1)
出于某种原因,我误以为 PUB 套接字没有发送队列,但它有。