如何在子服务器套接字关闭时使 ZMQ 发布客户端套接字缓冲消息

How to make ZMQ pub client socket buffer messages while sub server socket is down

给定 2 个应用程序,其中 应用程序 A 使用发布者客户端有争议地将数据流式传输到应用程序 B,应用程序 B 有一个子服务器套接字来接受该数据,我们如何在应用程序 A 中配置发布客户端套接字,以便当 B 不可用时(比如重新部署、重新启动) A 缓冲所有未决消息,当 B 变为可用时,缓冲消息进入低谷并且套接字赶上实时流?

简而言之,当 SUB SERVER 不可用时,我们如何使 PUB CLIENT 套接字缓冲区消息有一定的限制?

PUB 客户端的默认行为是进入静音状态,但如果我们可以将其更改为限制大小的缓冲区,那就太好了,zmq 可以吗?或者我需要在应用程序级别上做...

我试过在我的套接字中设置 HWM 和 LINGER,但如果我没记错的话,它们只对缓慢的消费者情况负责,在这种情况下,我的发布者连接到订阅者,但订阅者太慢以至于发布者开始缓冲消息(hwm 将限制这些消息的数量)...

我正在使用 jeromq 因为我的目标是 jvm 平台。

正如我们在评论中讨论的那样,publisher 无法在没有任何连接的情况下缓冲消息,它只会丢弃任何新消息:

来自文档:

If a publisher has no connected subscribers, then it will simply drop all messages.

这意味着你的缓冲区需要在 zeromq 的照顾之外。您的缓冲区可以是列表、数据库或您选择的任何其他存储方法,但您不能使用您的发布者来执行此操作。

现在下一个问题是处理如何检测订阅者有 connected/disconnected。这需要告诉我们何时需要从 buffer/filling 缓冲区开始读取。

我建议使用 Socket.monitor and listening for the ZMQ_EVENT_CONNECTED and ZMQ_EVENT_DISCONNECTED,因为这些会在客户端有 connected/disconnected 时告诉您,从而使您能够切换到填充您选择的缓冲区。当然,可能还有其他不直接涉及 zeromq 的方法,但这取决于您的决定。

首先,欢迎来到 Zen-of-Zero 的世界,这里的延迟最重要

序言:

ZeroMQ 由 Pieter HINTJENS 的最终经验丰富的大师团队设计 - Martin SUSTRIK 首先被命名。该设计经过专业制作,以避免任何不必要的延迟。所以询问是否有(有限的)持久性?不,先生,未确认 - PUB/SUB 可扩展的正式通信模式原型不会内置它,因为增加的问题和降低的性能和可扩展性(附加组件延迟、附加处理、附加内存管理)。

如果需要(有限的)持久性(对于缺少远程 SUB 端代理的连接),请随意在应用程序端实现它,或者可以设计和实现新的 ZMTP-兼容这样的行为模式原型,扩展 ZeroMQ 框架,如果这样的工作进入稳定和公开接受的状态,但不要求高性能、延迟削减的标准 PUB/SUB 已经完善了几乎线性的可扩展性 ad astra,朝这个方向修改。绝对不是办法。

解决方案?

应用端可以很容易地实现你添加的逻辑,使用双指针循环缓冲区,工作在某种(app-side-managed)-Persistence-PROXY,但在 PUB-sender.

的前面

如果您的设计也喜欢使用最近可用的内置 ZeroMQ-socket_monitor-组件

您的设计可能会成功地从 ZeroMQ 内部细节中榨取一些额外的调味料 设置一个额外的控制层并在那里接收 事件流 从 "inside" PUB 端 Context -instance,其中一些额外的网络和连接管理相关事件可能会为您的 (app-side-managed)-Persistence-PROXY 带来更多启发

但是,请注意

The _zmq_socket_monitor()_ method supports only connection-oriented transports, that is, TCP, IPC, and TIPC.

所以人们可能会直接忘记这一点,以防万一计划使用任何最终有趣的传输方式-类 { inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


注意!

我们的社区荣誉会员 smac89 提供的信息不准确,如果没有错误的话,他已尽力解决您在评论中表达的额外兴趣:

"...zmq optimizes publishing on topics? like if you keep publishing on some 100char long topic rapidly, is it actually sending the topic every time or it maps to some int and sends the int subsequently...?"

告诉你:

"It will always publish the topic. When I use the pub-sub pattern, I usually publish the topic first and then the actual message, so in the subscriber I just read the first frame and ignore it and then read the actual message"

ZeroMQ 不是这样工作的。没有什么是 "separate" <topic> 后跟 <message-body>,而是相反的

TOPIC 主题过滤 的机械化以一种非常不同的方式工作。

1) 你永远不知道,谁 .connect()-s:
i.e.几乎可以肯定版本 2.x 直到版本 4.2+ 将以不同的方式处理主题过滤( ZMTP:RFC 定义初始能力版本握手,让 Context-实例决定,必须使用哪个版本的主题过滤:
ver 2.x 用于将所有消息移动到所有对等点,并让 all SUB-sides(版本 2.x+ )被传递消息(并让 SUB-side Context-instance 处理本地 topic-list filter processing )

whereas
ver 4.2+肯定会执行topic-list filter processing on **the PUB-side Context-instance(CPU-usage grows, network-transport 相反),所以你的 SUB-side 永远不会被传递一个字节"useless" 阅读了 "not-subscribed" 到 条消息。

2) (您可以,但是) 没有必要将 "topic" 分隔到如此隐含的多帧消息的第一帧中。也许恰恰相反(在高性能、低延迟的分布式系统设计中这样做是一种相当反模式。

主题过滤过程已定义,按字节工作,从左到右,每个主题列表成员值与传递的消息负载的模式匹配。

添加额外的数据、额外的帧管理处理只会增加端到端延迟和处理开销。这样做而不是正确的 设计工作绝不是一个好主意。


结语:

在专业领域没有轻松的胜利,也没有任何唾手可得的果实 design, the less if 或超低延迟是设计目标。

另一方面,确保 ZeroMQ 框架是在考虑到这一点的情况下创建的,并且这些努力以稳定、最终性能良好平衡的工具集为智能(通过设计)、快速(在操作中)和可扩展性(令人羡慕的程度)signaling/messaging 由于这种设计智慧,人们喜欢正确使用服务。

希望您对 ZeroMQ 感到满意,并随时在您选择的应用程序套件中添加 ZeroMQ 层的任何额外功能集 "in front"。

我发布了一个快速更新,因为其他两个答案(虽然非常有用的答案实际上是错误的),我不希望其他人被我接受的答案误导。 不仅可以使用 zmq 做到这一点,它实际上是默认行为

诀窍是,如果您的发布者客户端在它不断丢弃消息之前从未连接到订阅者服务器(这就是我认为它不缓冲消息的原因),但是如果您的发布者连接到订阅者并且您重新启动订阅者,发布者将缓冲消息,直到达到 HWM,这正是我所要求的......所以简而言之,发布者想知道另一端有人接受消息,只有在那之后它才会缓冲消息......

这里有一些示例代码演示了这一点(您可能需要做一些基本的编辑才能编译它)。

我只使用了这个依赖项org.zeromq:jeromq:0.5.1

zmq-publisher.kt

fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.PUB)

   socket.hwm = 10000
   socket.linger = 0
   "connecting to $uri".log()
   socket.connect(uri)

   fun publish(path: String, msg: Msg) {
      ">> $path | ${msg.json()}".log()
      socket.sendMore(path)
      socket.send(msg.toByteArray())
   }

   var count = 0

   while (notInterrupted()) {
      val msg = telegramMessage("message : ${++count}")
      publish("/some/feed", msg)
      println()

      sleepInterruptible(1.second)
   }
}

当然还有zmq-subscriber.kt


fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.SUB)

   socket.hwm = 10000
   socket.receiveTimeOut = 250

   "connecting to $uri".log()
   socket.bind(uri)

   socket.subscribe("/some/feed")

   while (true) {
      val path = socket.recvStr() ?: continue
      val bytes = socket.recv()
      val msg = Msg.parseFrom(bytes)
      "<< $path | ${msg.json()}".log()
   }
}

先尝试 运行 没有订阅者的发布者,然后当您启动订阅者时,您错过了到目前为止的所有消息...现在无需重新启动发布者,停止订阅者等待一段时间,然后重新启动它。

这是我的一项服务实际受益于此的示例... 这是结构 [current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]

因为我在中间重启了服务,所有发布者都开始缓冲他们的消息,观察到每秒约 200 条消息的最终服务观察到下降到 0(那些 1 或 2 是心跳)然后突然爆发 1000 + 消息进来,因为所有发布者都刷新了他们的缓冲区(重新启动大约需要 5 秒)...实际上我在这里没有丢失任何消息...

请注意,您必须有 subscriber:server <= publisher:client 对(这样发布者就知道 "there is only one place i need to deliver these messages to"(您可以尝试绑定发布者并连接订阅者,但您将不会再看到发布者缓冲消息,因为它有问题如果刚刚断开连接的订阅者这样做是因为它不再需要数据或因为它失败了)