如何用PUB/SUB来防止buffering/latency?

How to prevent buffering/latency with PUB/SUB?

我正在将视频作为一系列图像(等于 zmq 消息)发送,但有时,也许当网络速度较慢时,接收它们的速度比发送它们的速度慢,并且出现越来越长的延迟,似乎是到大约一分钟的视频或数百张图像或数兆字节的数据。它通常最终会自行清除,订阅者接收消息的速度比发布者发送消息的速度更快。

相反,如果订阅者 recv发送它们太慢,我希望它以与预期相同的方式丢弃错过的消息。我希望 zmq.CONFLATE=1 会这样做,但事实并非如此。那么如何?我怀疑它们在发布者处被缓冲,发布者不应该有任何 zmq 缓冲区,或者以某种方式在网络堆栈中。

简化的服务器代码

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
  stream.truncate()
  stream.seek(0)
  socket.send(stream.read())
  stream.seek(0)

简化客户端代码

# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket.setsockopt(zmq.SUBSCRIBE, b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")

def get_image(self):
  # Receive the latest image
  poll_result = self.video_socket.poll(timeout=0)
  if poll_result == zmq.POLLIN:
    return self.video_socket.recv()
  else:
    return None

发布者在 Raspberry Pi,订阅者在 Windows。

我不确定您使用的 python zmq 是哪个版本,但基于底层的 c++ libzmq,您需要:

  • 在服务器套接字上设置 ZMQ_SNDHWM 套接字选项
  • 在客户端套接字上设置 ZMQ_RCVHWM 套接字选项。

在 pub/sub 的情况下,这些选项限制每个已完成连接排队的消息数。如果队列增长大于 HWM(高水位线),消息将被丢弃。

同时关闭合并,因为这会干扰这些选项。

同时在服务器上设置 zmq.CONFLATE=1 以仅保留发送队列中的最新邮件。

绑定服务器套接字之前

socket.setsockopt(zmq.CONFLATE, 1)

出于某种原因,我误以为 PUB 套接字没有发送队列,但它有。