我需要为 ZMQ 上的每个订阅创建新的套接字吗?

Do I need to create new Socket for each SUBscription on ZMQ?

服务器循环遍历对象列表,这些对象上的数据实时变化。服务器每毫秒发布这些对象的所有新数据。即 ['Carrot', 'Banana', 'Mango', 'Eggplant']

客户可以通过他们的名字订阅特定的对象。 self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'Carrot')
在线程上,客户端也实时轮询这些数据:

while True:
    sockets = dict(self.poller.poll(poll_timeout))
    if self.sub_socket in sockets and sockets[self.sub_socket] == zmq.POLLIN:
        msg = self.sub_socket.recv_string(zmq.DONTWAIT)

        // do something with the msg...

问题是当我订阅多个对象时 Carrot, Eggplant & Banana。我只收到来自 Carrot 的更改,有时 Banana,很少收到 Eggplant 的更改。我认为这是因为从服务器的循环顺序来看,比如当客户端轮询、接收 Carrot、处理数据、然后再次轮询时,服务器已经完成了通过列表的发布,然后再次发布 Carrot,然后客户端民意调查因此只收到胡萝卜。

所以我想为每个订阅创建单独的套接字?那是一个解决方案吗?我是 ZMQ 的新手。

Q : "Is that a solution?" ... creating individual sockets for each subscription?

否。除非出于某些我不知道的其他原因。


虽然 ZeroMQ 消息传递基础设施为每条消息传递提供零保证,但这并不意味着消息在发送后会消失或丢失。它只是说,期望对每一个交付的零保修,如果需要,可以添加这种想要的保修机制开销,如果其他人可以在没有它们的情况下工作,则无需支付。损失一百万分之一? 1 合 1.000.000.000?这取决于许多因素,但丢失消息并不是分布式计算系统的常见或随机状态(并且有一些内部原因,其详细信息超出了本 post 的范围)。


还有疑问吗?

进行测试。

设计一个简单的测试-PUB端发送一条均匀分布的琐碎消息

SAMPLEs  = int( 1E6 )
aMsgSIZE = 2048
TOPICs   = [ r'Carrot', r'Banana', r'Mango', r'Eggplant', r'' ]
MASK     = "{0:}" + aMsgSIZE * "_"

for i in range( SAMPLEs ):
    PUB.send( MASK.format( TOPICs[np.random.randint( len( TOPICs ) - 1 ) ) )
    time.sleep( 1E-3 )

使用此测试,您将收到均匀分布的样本,其中每个订阅的数量相同 TOPICs(如果全部订阅)。

增加 aMsgSIZE 可能(在默认 Context()- 和 Socket()- 实例下)创建一些消息以“丢失”,但同样,这应该是均匀分布的。否则,再深入挖掘会有些麻烦。

统一未从 SAMPLEs 数量传送的消息数量将表明需要调整 Context()Socket()-实例的参数以提供资源的需要有多大足以安全地排队该数量的数据流。然而,为个人订阅 Topic-strings 提供更多 Socket()-s 不会解决此资源管理瓶颈(如果存在)。

不要犹豫 post 测试结果,如果主题的均匀分布组合有或没有倾斜,最后没有收到多少分数。

添加平台详细信息 + ZeroMQ 版本,一切都很重要,一如既往:o)