如何使用pyzmq正确发布和订阅最新消息?
How to publish and subscribe the latest message correctly using pyzmq?
我有一个进程A,不断发布消息,进程B和C订阅主题,获取进程A发布者发布的最新消息。
因此,我将 zmq.CONFLATE
设置为发布者和订阅者。但是,我发现有一位订阅者收不到消息。
def publisher(sleep_time=1.0, port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:%s" % port)
print ("Running publisher on port: ", port)
while True:
localtime = time.asctime( time.localtime(time.time()))
string = "Message published time: {}".format(localtime)
socket.send_string("{}".format(string))
time.sleep(sleep_time)
def subscriber(name="sub", sleep_time=1, ports="5556"):
print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))
context = zmq.Context()
print ("Connecting to publisher with ports %s" % ports)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
socket.connect ("tcp://localhost:%s" % ports)
while True:
message = socket.recv()
localtime = time.asctime( time.localtime(time.time()))
print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
time.sleep(sleep_time)
if __name__ == "__main__":
Process(target=publisher).start()
Process(target=subscriber, args=("SUB1", 1.2, )).start()
Process(target=subscriber, args=("SUB2", 1.1, )).start()
我试图在发布者中取消设置 socket.setsockopt(zmq.CONFLATE, 1)
,这似乎解决了问题。进程B和C的订阅者都能收到消息,消息好像是最新的。
我试图找出为什么将发布者设置为 CONFLATE
会导致我遇到的问题。我找不到有关它的信息。有谁知道是什么导致了这种行为?
另外,我想知道,在一个发布者对多个订阅者的情况下,正确的代码设置是什么,才能让订阅者始终获得最新消息?
很可能是计时问题,ZMQ_CONFLATE 套接字选项将 入站 和出站队列限制为 1 条消息。
PUB/SUB 的工作方式是当您设置 ZMQ_SUBSCRIBE 选项时订阅者向发布者发送订阅消息。如果同时启动两个订阅者,则到达发布者队列的订阅消息之一可能会被丢弃。
尝试在开始每个订阅者之间添加一个睡眠。
来自 zeromq 文档
If set, a socket shall keep only one message in its inbound/outbound
queue, this message being the last message received/the last message
to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not
support multi-part messages, in particular, only one part of it is
kept in the socket internal queue.
我并不是说这是您问题的解决方案,但如果是这种情况,我们可能需要 post 更改 libzmq 以使合并选项更细化,以便您可以选择是否应该合并应用于入站或出站队列。
ZMQ
订阅套接字(使用 CONFLATE
选项)中有一种方法可以获取“仅最后一条消息”选项。
订阅方需要它。
这是一个例子:
import zmq
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while True:
data = socket.recv()
print data
换句话说,我删除了订阅者代码中的所有缓冲队列。
[额外]:
使用 zmq.SNDBUF
和 zmq.RCVBUF
选项,我们可以设置 ZMQ 缓冲区大小的限制。 (More complete and an example)
我有一个进程A,不断发布消息,进程B和C订阅主题,获取进程A发布者发布的最新消息。
因此,我将 zmq.CONFLATE
设置为发布者和订阅者。但是,我发现有一位订阅者收不到消息。
def publisher(sleep_time=1.0, port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:%s" % port)
print ("Running publisher on port: ", port)
while True:
localtime = time.asctime( time.localtime(time.time()))
string = "Message published time: {}".format(localtime)
socket.send_string("{}".format(string))
time.sleep(sleep_time)
def subscriber(name="sub", sleep_time=1, ports="5556"):
print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))
context = zmq.Context()
print ("Connecting to publisher with ports %s" % ports)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
socket.connect ("tcp://localhost:%s" % ports)
while True:
message = socket.recv()
localtime = time.asctime( time.localtime(time.time()))
print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
time.sleep(sleep_time)
if __name__ == "__main__":
Process(target=publisher).start()
Process(target=subscriber, args=("SUB1", 1.2, )).start()
Process(target=subscriber, args=("SUB2", 1.1, )).start()
我试图在发布者中取消设置 socket.setsockopt(zmq.CONFLATE, 1)
,这似乎解决了问题。进程B和C的订阅者都能收到消息,消息好像是最新的。
我试图找出为什么将发布者设置为 CONFLATE
会导致我遇到的问题。我找不到有关它的信息。有谁知道是什么导致了这种行为?
另外,我想知道,在一个发布者对多个订阅者的情况下,正确的代码设置是什么,才能让订阅者始终获得最新消息?
很可能是计时问题,ZMQ_CONFLATE 套接字选项将 入站 和出站队列限制为 1 条消息。
PUB/SUB 的工作方式是当您设置 ZMQ_SUBSCRIBE 选项时订阅者向发布者发送订阅消息。如果同时启动两个订阅者,则到达发布者队列的订阅消息之一可能会被丢弃。
尝试在开始每个订阅者之间添加一个睡眠。
来自 zeromq 文档
If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.
我并不是说这是您问题的解决方案,但如果是这种情况,我们可能需要 post 更改 libzmq 以使合并选项更细化,以便您可以选择是否应该合并应用于入站或出站队列。
ZMQ
订阅套接字(使用 CONFLATE
选项)中有一种方法可以获取“仅最后一条消息”选项。
订阅方需要它。
这是一个例子:
import zmq
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while True:
data = socket.recv()
print data
换句话说,我删除了订阅者代码中的所有缓冲队列。
[额外]:
使用 zmq.SNDBUF
和 zmq.RCVBUF
选项,我们可以设置 ZMQ 缓冲区大小的限制。 (More complete and an example)