如何在 Pulsar 中进行广播

How to do broadcast in Pulsar

我正在为我们的集群研究一项技术。 Pulsar 看起来不错,但用法看起来更像是一个排队系统。有排队系统当然好,但我有一个具体的要求:广播。

我们想使用一台机器生成数据并将其发布到 Pulsar 主题。然后我们使用一组服务器,形成一个副本。每个服务器都使用该主题的消息流,并通过 WebSocket 为客户端提供服务。

这与共享订阅不同,因为每个服务器都需要接收全部 条消息,而不是其中的一小部分。

我来到这个 post: https://kafkaesque.io/subscriptions-multiple-groups-of-consumers-on-pulsar-topic/ ,它解释了如何做这样的工作:每个服务器都需要创建一个新的独占订阅,比如使用 UUID 作为其订阅名称,从独特的独家订阅,您可以获得该主题的完整消息流。

但是由于我们的服务器副本可以是动态的,所以一旦某些服务器重新启动,他们将重新创建新的UUID订阅,这将在该主题上留下许多孤立的订阅,最终成为维护的头疼。

有人有使用 Pulsar 设置广播用例的经验吗?

为每个消费者使用独占订阅是确保您的每个消费者都收到关于该主题的所有消息的唯一方法,Pulsar 可以很好地处理多个订阅。

问题似乎是服务器重启用例,我不认为简单地连接新的 UUID 订阅是正确的方法(抛开孤立的订阅)。您确实希望服务器在重新启动后重用之前的订阅。这是因为每个订阅都会跟踪它已处理和确认的主题中的最后一条消息,因此如果您使用相同的订阅 UUID 重新连接,您可以准确地从服务器崩溃前中断的位置继续。如果您使用新的 UUID 连接,那么您将开始处理从该时间点向前产生的消息,并且在重新启动期间产生的所有消息将是 "lost"

因此,您需要找到一种机制来在服务器出现故障时共享这些 UUID,并 return 将它们发送到重新启动的服务器。一种方法是采用类似于 zookeeper 领导者选举的机制,其中每个服务器都被授予一个定期到期的独占租约。服务器必须定期刷新租约以保留它。然后,如果服务器崩溃,它将无法刷新该 UUID 上的租约,并且重新启动的服务器将在尝试重新连接时获得租约。

有关模式的更好解释,请参阅 https://curator.apache.org/curator-recipes/leader-election.html

实际上,我发现 "Reader Interface" 正是针对这种用例:

https://pulsar.apache.org/docs/en/concepts-clients/#reader-interface