ZMQ 丢弃旧消息

ZMQ drop old messages

我正在尝试创建一个现实生活中的系统,其中订阅者需要对发布者提供的实时数据执行操作。有时 PUB 和 SUB 会不同步(最多 10 秒),因为它们正在执行某些操作,而我总是需要来自发布者的最新数据,否则订阅者执行的操作将会偏离。

我正在尝试使用 SUB/PUB 方法并尝试设置 HWM 限制,但它似乎不起作用。 我已经尝试过断开连接的方法,但它会在系统中增加一秒的延迟,而且我的系统 90% 的时间都是实时工作的,所以通过使用断开连接整个系统会崩溃。

订阅者(我试图通过time.sleep()来模拟实际系统):

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)

consumer_receiver.set_hwm(0)
consumer_receiver.connect("tcp://127.0.0.1:5555")

consumer_receiver.subscribe(b'')


while 1:
    d=random.randint(0,10)

    work = consumer_receiver.recv_pyobj()
    # consumer_receiver.disconnect()
    print(work,"  :",d)
    time.sleep(d)

出版商:

import time
import zmq

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind("tcp://127.0.0.1:5555")

for x in range(1000):

    # zmq_socket.send_string("", zmq.SNDMORE)
    zmq_socket.send_pyobj(x,zmq.NOBLOCK)
    time.sleep(1)
    print(x)

OK我的救星是CONFLATE. Thanks to thispost问题好像解决了

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)    

consumer_receiver.setsockopt(zmq.CONFLATE, 1)

consumer_receiver.connect("tcp://127.0.0.1:5555") 
consumer_receiver.subscribe(b'')


while 1:
    d=random.randint(1,10)

    work = consumer_receiver.recv_pyobj()

    print(work,"  :",d)

    time.sleep(d)