ZeroMQ XPUB recv() 是一种查找是否有订阅者并解决慢连接综合症的解决方案吗?

Is ZeroMQ XPUB recv() a solution for finding if there is a subscriber and solve the slow joiner syndrome?

我的用例:

  1. 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
  2. Publishers 将在不同的线程中初始化为客户端(连接到端口)。
  3. 要在每个线程中发布的数据将是几条消息。
  4. 订阅者连接后尽快收到每条消息很重要。
  5. 如果订阅者未连接,那么我不想让发布者线程处于阻塞状态,理想情况下有 1-2 秒左右的超时它会起作用。

慢速连接器问题:

运行 超过 1000 个线程(发布者)只有 1 或 2 次我在 Subscriber 中获取所有数据。 添加几毫秒的睡眠可以解决问题,所以我 99.9% 确定我是众所周知的慢连接综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能是可变的,我希望尽快将数据发送给订阅者。

我解决这个问题的思路和实验代码:

我的解决方案是基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000ms。发布者连接后,我添加了一个 recv() 调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成并且我可以发送数据而不会丢失任何数据(除非订阅者发生错误但我不在乎)。

如果我没有收到任何订阅消息,那么在 1000 毫秒后 recv() 超时并且线程终止。

这是 python(pyzmq) 中的示例代码,用于测试此实现(对于发布者,我不使用线程,而是使用 while 循环和 运行 同时发布多个发布者)和它按照我的意愿工作:

publisher.py:

import zmq

def main():
    """ main method """

    i = 0
    while True:
        # Prepare context and publisher
        context = zmq.Context()
        publisher = context.socket(zmq.XPUB)
        publisher.connect("tcp://0.0.0.0:5650")
        publisher.setsockopt(zmq.RCVTIMEO, 1000)

        # Waiting for 1000ms to get a subscription
        i = i + 1
        try:
            publisher.recv()
            # Send the message
            publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
        except Exception as e:
            print(e, flush=True)

        # Terminate socket and context
        publisher.close()
        context.term()
        if i >= 10000:
            break

if __name__ == "__main__":
    main()    

subscriber.py:

import zmq

def main():
    """ main method """

    # Prepare our context and subscriber
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    uri = "tcp://0.0.0.0:5650"
    subscriber.bind(uri)
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    print('Subscriber connects to %s' % (uri), flush=True)

    # Receive messages
    i = 0
    while True:
        [topic, data] = subscriber.recv_multipart()
        i = i + 1
        print("%s: %s %s" % (i, topic, data), flush=True)

if __name__ == "__main__":
    main()

我的问题:

有这么简单的解决方法吗?如果有订阅者处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速加入者相关)?

Q : "Is the solution that simple?"

恰恰相反。对于上面发布的内容,解决方案过于复杂 w.r.t。到目前为止发布的用例要求。

a) 鉴于上述要求,在线程之间进行通信时,可以消除与 ISO-OSI-L3 tcp:// 传输 Class 的设置和维护相关的所有成本-位于属于同一进程的同一主机上。而是采用超快、无堆栈、内存映射 inproc:// 传输 Class 来避免所有这些低效率。 ZeroMQ API v4.0+在设置inproc://-TransportClass { .bind() | .connect() }-顺序上也有无其他条件的便利,让我们尽情享受零拷贝“传输”消息的 MEM 映射超低延迟标记(无需移动 RAM 中的数据字节)- 很酷,不是吗? (除非您需要注入 MITM 协议嗅探,否则请删除 tcp:// 矫枉过正)

b) 考虑到上述要求,传递几条消息,其中“静态”SUB 端订阅了所有这些消息,这是非常重要的PUB/SUB 可扩展正式通信模式原型的低效使用。您的代码必须支付所有费用来设置新的 SUB-实例,然后它会爬行以设置有效连接(在 tcp://-TransportClass' 堆栈上,希望在 a 下删除) ),下一步是设置一个新的主题过滤器(无论是在早期版本的 SUB 端还是在较新的 ZeroMQ 版本中的 PUB 端运行——所有这些都付出了巨大的代价来接收所有消息——即根本没有过滤)。使用更轻量级的多节点-PUSH/PULL-在一个节点上的方式可以实现相同的正式服务。如果没有其他任何反向/双向/更复杂的正式通信的需要,这一个 PUSH/PULL 将能够完成所要求的工作。

c) 鉴于上述要求,您的重点似乎是通过连接过早地发送消息而不会丢失消息。在 ZeroMQ 设置中有用于确定这一点的工具,但您不小心使用它们:

  • 使用 zmq.IMMEDIATE 可以在没有现成的连接工作(或永远)的情况下使用 AccessNode 的阻塞状态
  • 使用 return 代码和 errno(或 zmq.errno() 用于 POSIX-不兼容的操作系统/Win32 等)处理可能有助于您的代码检测和做出反应在 的整个生命周期中,“自治代理网络”中发生的任何和所有特定情况(无论代理是否确实“物理上”分布或位于同一地点,就像这里的情况一样)。不失去控制是这里的核心责任。什么是控制代码,在失去控制状态下自锁,甚至无法控制自己;) ?

d) 永远不要使用阻塞形式的 { .recv() | .send() | .poll() | ... }-methods。教科书示例与专业的 signaling/messaging 元平面实现应该是什么样子的反模式相当。确实从来没有 - 参考。上述第 5 项。

e) 最好重新使用 Context()-实例,而不是像上面概述的那样将其设为 consumable/disposable。线程可以自由共享一个预先实例化的 Context() 引擎,避免接下来大量的重复性附加开销成本,如果每个分叉重新实例化一个 consumable/disposable Context(),只是一个简短的-lived,对等客户端线程。

f) 如果有人知道更好的解决方案,请随时通知我们 :o)

评论中的问题

a)
Subscriber will be on another machine, so I think tcp:// is the solution.*

当然,这里是 NP。 { pgm:// | epgm:// | tipc:// }-如果进一步向更高性能水平方向发展,传输可能在这里很有趣

b)
Subscriber is going to forward via an XPUB socket the messages to other subscribers. PUSH/PULL could work but if I want to pass those subscriptions and their filters to the initial publishers and filter some messages at the source, I have to use PUB/SUB pattern.

嗯,没提到的O/P。 XPUBs/XSUBs 的任何分层都可能工作良好,问题出在连接管理级别

c)
Just to clarify, not losing messages is important only when there is a subscriber. Could you explain this part a little more?

当然,在 RTO 连接的 link 上没有可用的订阅者,准备好“通过网络”立即传递,任何消息都无法传递(并且可能被静默丢弃,这就是你尝试对抗,不是吗?)。这就是 zmq.IMMEDIATE 可以通过调用 .setsockopt() 方法来管理的内容。