ZeroMQ XPUB recv() 是一种查找是否有订阅者并解决慢连接综合症的解决方案吗?
Is ZeroMQ XPUB recv() a solution for finding if there is a subscriber and solve the slow joiner syndrome?
我的用例:
- 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
- Publishers 将在不同的线程中初始化为客户端(连接到端口)。
- 要在每个线程中发布的数据将是几条消息。
- 订阅者连接后尽快收到每条消息很重要。
- 如果订阅者未连接,那么我不想让发布者线程处于阻塞状态,理想情况下有 1-2 秒左右的超时它会起作用。
慢速连接器问题:
运行 超过 1000 个线程(发布者)只有 1 或 2 次我在 Subscriber 中获取所有数据。
添加几毫秒的睡眠可以解决问题,所以我 99.9% 确定我是众所周知的慢连接综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能是可变的,我希望尽快将数据发送给订阅者。
我解决这个问题的思路和实验代码:
我的解决方案是基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000ms。发布者连接后,我添加了一个 recv()
调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成并且我可以发送数据而不会丢失任何数据(除非订阅者发生错误但我不在乎)。
如果我没有收到任何订阅消息,那么在 1000 毫秒后 recv()
超时并且线程终止。
这是 python(pyzmq) 中的示例代码,用于测试此实现(对于发布者,我不使用线程,而是使用 while 循环和 运行 同时发布多个发布者)和它按照我的意愿工作:
publisher.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
subscriber.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
我的问题:
有这么简单的解决方法吗?如果有订阅者处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速加入者相关)?
Q : "Is the solution that simple?"
恰恰相反。对于上面发布的内容,解决方案过于复杂 w.r.t。到目前为止发布的用例要求。
a) 鉴于上述要求,在线程之间进行通信时,可以消除与 ISO-OSI-L3 tcp://
传输 Class 的设置和维护相关的所有成本-位于属于同一进程的同一主机上。而是采用超快、无堆栈、内存映射 inproc://
传输 Class 来避免所有这些低效率。 ZeroMQ API v4.0+在设置inproc://
-TransportClass { .bind() | .connect() }
-顺序上也有无其他条件的便利,让我们尽情享受零拷贝“传输”消息的 MEM 映射超低延迟标记(无需移动 RAM 中的数据字节)- 很酷,不是吗? (除非您需要注入 MITM 协议嗅探,否则请删除 tcp://
矫枉过正)
b) 考虑到上述要求,传递几条消息,其中“静态”SUB
端订阅了所有这些消息,这是非常重要的PUB/SUB
可扩展正式通信模式原型的低效使用。您的代码必须支付所有费用来设置新的 SUB
-实例,然后它会爬行以设置有效连接(在 tcp://
-TransportClass' 堆栈上,希望在 a 下删除) ),下一步是设置一个新的主题过滤器(无论是在早期版本的 SUB 端还是在较新的 ZeroMQ 版本中的 PUB 端运行——所有这些都付出了巨大的代价来接收所有消息——即根本没有过滤)。使用更轻量级的多节点-PUSH/PULL
-在一个节点上的方式可以实现相同的正式服务。如果没有其他任何反向/双向/更复杂的正式通信的需要,这一个 PUSH/PULL
将能够完成所要求的工作。
c) 鉴于上述要求,您的重点似乎是通过连接过早地发送消息而不会丢失消息。在 ZeroMQ 设置中有用于确定这一点的工具,但您不小心使用它们:
- 使用
zmq.IMMEDIATE
可以在没有现成的连接工作(或永远)的情况下使用 AccessNode 的阻塞状态
- 使用 return 代码和
errno
(或 zmq.errno()
用于 POSIX-不兼容的操作系统/Win32 等)处理可能有助于您的代码检测和做出反应在 distributed-computing 的整个生命周期中,“自治代理网络”中发生的任何和所有特定情况(无论代理是否确实“物理上”分布或位于同一地点,就像这里的情况一样)。不失去控制是这里的核心责任。什么是控制代码,在失去控制状态下自锁,甚至无法控制自己;) ?
d) 永远不要使用阻塞形式的 { .recv() | .send() | .poll() | ... }
-methods。教科书示例与专业的 signaling/messaging 元平面实现应该是什么样子的反模式相当。确实从来没有 - 参考。上述第 5 项。
e) 最好重新使用 Context()
-实例,而不是像上面概述的那样将其设为 consumable/disposable。线程可以自由共享一个预先实例化的 Context()
引擎,避免接下来大量的重复性附加开销成本,如果每个分叉重新实例化一个 consumable/disposable Context()
,只是一个简短的-lived,对等客户端线程。
f) 如果有人知道更好的解决方案,请随时通知我们 :o)
评论中的问题
a)
Subscriber will be on another machine, so I think tcp://
is the solution.*
当然,这里是 NP。 { pgm:// | epgm:// | tipc:// }
-如果进一步向更高性能水平方向发展,传输可能在这里很有趣
b)
Subscriber is going to forward via an XPUB
socket the messages to other subscribers. PUSH/PULL
could work but if I want to pass those subscriptions and their filters to the initial publishers and filter some messages at the source, I have to use PUB/SUB
pattern.
嗯,没提到的O/P。 XPUB
s/XSUB
s 的任何分层都可能工作良好,问题出在连接管理级别
c)
Just to clarify, not losing messages is important only when there is a subscriber. Could you explain this part a little more?
当然,在 RTO 连接的 link 上没有可用的订阅者,准备好“通过网络”立即传递,任何消息都无法传递(并且可能被静默丢弃,这就是你尝试对抗,不是吗?)。这就是 zmq.IMMEDIATE
可以通过调用 .setsockopt()
方法来管理的内容。
我的用例:
- 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
- Publishers 将在不同的线程中初始化为客户端(连接到端口)。
- 要在每个线程中发布的数据将是几条消息。
- 订阅者连接后尽快收到每条消息很重要。
- 如果订阅者未连接,那么我不想让发布者线程处于阻塞状态,理想情况下有 1-2 秒左右的超时它会起作用。
慢速连接器问题:
运行 超过 1000 个线程(发布者)只有 1 或 2 次我在 Subscriber 中获取所有数据。 添加几毫秒的睡眠可以解决问题,所以我 99.9% 确定我是众所周知的慢连接综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能是可变的,我希望尽快将数据发送给订阅者。
我解决这个问题的思路和实验代码:
我的解决方案是基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000ms。发布者连接后,我添加了一个 recv()
调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成并且我可以发送数据而不会丢失任何数据(除非订阅者发生错误但我不在乎)。
如果我没有收到任何订阅消息,那么在 1000 毫秒后 recv()
超时并且线程终止。
这是 python(pyzmq) 中的示例代码,用于测试此实现(对于发布者,我不使用线程,而是使用 while 循环和 运行 同时发布多个发布者)和它按照我的意愿工作:
publisher.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
subscriber.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
我的问题:
有这么简单的解决方法吗?如果有订阅者处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速加入者相关)?
Q : "Is the solution that simple?"
恰恰相反。对于上面发布的内容,解决方案过于复杂 w.r.t。到目前为止发布的用例要求。
a) 鉴于上述要求,在线程之间进行通信时,可以消除与 ISO-OSI-L3 tcp://
传输 Class 的设置和维护相关的所有成本-位于属于同一进程的同一主机上。而是采用超快、无堆栈、内存映射 inproc://
传输 Class 来避免所有这些低效率。 ZeroMQ API v4.0+在设置inproc://
-TransportClass { .bind() | .connect() }
-顺序上也有无其他条件的便利,让我们尽情享受零拷贝“传输”消息的 MEM 映射超低延迟标记(无需移动 RAM 中的数据字节)- 很酷,不是吗? (除非您需要注入 MITM 协议嗅探,否则请删除 tcp://
矫枉过正)
b) 考虑到上述要求,传递几条消息,其中“静态”SUB
端订阅了所有这些消息,这是非常重要的PUB/SUB
可扩展正式通信模式原型的低效使用。您的代码必须支付所有费用来设置新的 SUB
-实例,然后它会爬行以设置有效连接(在 tcp://
-TransportClass' 堆栈上,希望在 a 下删除) ),下一步是设置一个新的主题过滤器(无论是在早期版本的 SUB 端还是在较新的 ZeroMQ 版本中的 PUB 端运行——所有这些都付出了巨大的代价来接收所有消息——即根本没有过滤)。使用更轻量级的多节点-PUSH/PULL
-在一个节点上的方式可以实现相同的正式服务。如果没有其他任何反向/双向/更复杂的正式通信的需要,这一个 PUSH/PULL
将能够完成所要求的工作。
c) 鉴于上述要求,您的重点似乎是通过连接过早地发送消息而不会丢失消息。在 ZeroMQ 设置中有用于确定这一点的工具,但您不小心使用它们:
- 使用
zmq.IMMEDIATE
可以在没有现成的连接工作(或永远)的情况下使用 AccessNode 的阻塞状态 - 使用 return 代码和
errno
(或zmq.errno()
用于 POSIX-不兼容的操作系统/Win32 等)处理可能有助于您的代码检测和做出反应在 distributed-computing 的整个生命周期中,“自治代理网络”中发生的任何和所有特定情况(无论代理是否确实“物理上”分布或位于同一地点,就像这里的情况一样)。不失去控制是这里的核心责任。什么是控制代码,在失去控制状态下自锁,甚至无法控制自己;) ?
d) 永远不要使用阻塞形式的 { .recv() | .send() | .poll() | ... }
-methods。教科书示例与专业的 signaling/messaging 元平面实现应该是什么样子的反模式相当。确实从来没有 - 参考。上述第 5 项。
e) 最好重新使用 Context()
-实例,而不是像上面概述的那样将其设为 consumable/disposable。线程可以自由共享一个预先实例化的 Context()
引擎,避免接下来大量的重复性附加开销成本,如果每个分叉重新实例化一个 consumable/disposable Context()
,只是一个简短的-lived,对等客户端线程。
f) 如果有人知道更好的解决方案,请随时通知我们 :o)
评论中的问题
a)
Subscriber will be on another machine, so I thinktcp://
is the solution.*
当然,这里是 NP。 { pgm:// | epgm:// | tipc:// }
-如果进一步向更高性能水平方向发展,传输可能在这里很有趣
b)
Subscriber is going to forward via anXPUB
socket the messages to other subscribers.PUSH/PULL
could work but if I want to pass those subscriptions and their filters to the initial publishers and filter some messages at the source, I have to usePUB/SUB
pattern.
嗯,没提到的O/P。 XPUB
s/XSUB
s 的任何分层都可能工作良好,问题出在连接管理级别
c)
Just to clarify, not losing messages is important only when there is a subscriber. Could you explain this part a little more?
当然,在 RTO 连接的 link 上没有可用的订阅者,准备好“通过网络”立即传递,任何消息都无法传递(并且可能被静默丢弃,这就是你尝试对抗,不是吗?)。这就是 zmq.IMMEDIATE
可以通过调用 .setsockopt()
方法来管理的内容。