如何replace/transition一个Kafka消费组?

How to replace/transition a Kafka consumer group?

我有一个包含多个 Kafka 流的存储库。我想将其中一个流提取到它自己的存储库中。但是,我不确定如何处理该流的消费者群体。这就是我的意思:在新的存储库中,流将具有不同的 application.id。据我了解,消费者组的名称是根据 application.id 设置的。如果我简单地关闭旧流,对于每个主题的每个分区,新流将从第零个偏移量开始,而不是从旧流停止的偏移量开始。这将导致输出主题中出现重复消息。

是否有一些关于如何处理此问题的通用 rule/best 实践?我需要:

流使用了相当多的输入主题(大约 20 个,幸运的是每个主题都有一个分区),所以我不确定如何处理这个问题。

(0) 如果可能,我建议您尝试保留 application.id,这样您的所有问题都会消失。对于这种情况,您可以先停止旧应用程序然后启动新应用程序,或者甚至先启动所有新实例然后拆除所有旧实例。由于新旧应用程序都连接到同一个 Kafka 集群,因此将实现无缝切换。

(1) 只有你的 Kafka Streams 应用程序是无状态的,你想做的才有效;对于有状态应用程序,需要更多步骤才能将状态转移到新的应用程序实例。或者新实例将从空状态开始(也可以,具体取决于您的应用程序要求)。

(2) 您首先需要停止所有旧应用程序实例并接收旧 application.id 每个输入主题分区的最后提交偏移量(通过 bin/kafka-consumer-groups.sh 使用 --describe --group选项)之后,您需要使用新的 application.id 提交这些偏移量(同样,您可以使用选项 --to-offset 来使用 bin/kafka-consumer-groups.sh)。 (有关详细信息,请参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling)之后,您可以启动新的应用程序实例,该实例将为新的 application.id.

获取已提交的偏移量