如何replace/transition一个Kafka消费组?
How to replace/transition a Kafka consumer group?
我有一个包含多个 Kafka 流的存储库。我想将其中一个流提取到它自己的存储库中。但是,我不确定如何处理该流的消费者群体。这就是我的意思:在新的存储库中,流将具有不同的 application.id
。据我了解,消费者组的名称是根据 application.id
设置的。如果我简单地关闭旧流,对于每个主题的每个分区,新流将从第零个偏移量开始,而不是从旧流停止的偏移量开始。这将导致输出主题中出现重复消息。
是否有一些关于如何处理此问题的通用 rule/best 实践?我需要:
- 关闭旧流;
- 检查每个主题的每个分区的旧流消费者组的偏移量;
- "tell" 新流从各自的偏移量开始。我怎么"tell" :) 呢?我的意思是 - 如果我使用
kafka-console-consumer
,则有 partition
和 offset
选项,但是流在幕后启动自己的消费者,所以我不确定如何控制它。
流使用了相当多的输入主题(大约 20 个,幸运的是每个主题都有一个分区),所以我不确定如何处理这个问题。
(0) 如果可能,我建议您尝试保留 application.id
,这样您的所有问题都会消失。对于这种情况,您可以先停止旧应用程序然后启动新应用程序,或者甚至先启动所有新实例然后拆除所有旧实例。由于新旧应用程序都连接到同一个 Kafka 集群,因此将实现无缝切换。
(1) 只有你的 Kafka Streams 应用程序是无状态的,你想做的才有效;对于有状态应用程序,需要更多步骤才能将状态转移到新的应用程序实例。或者新实例将从空状态开始(也可以,具体取决于您的应用程序要求)。
(2) 您首先需要停止所有旧应用程序实例并接收旧 application.id
每个输入主题分区的最后提交偏移量(通过 bin/kafka-consumer-groups.sh
使用 --describe --group
选项)之后,您需要使用新的 application.id
提交这些偏移量(同样,您可以使用选项 --to-offset
来使用 bin/kafka-consumer-groups.sh
)。 (有关详细信息,请参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling)之后,您可以启动新的应用程序实例,该实例将为新的 application.id
.
获取已提交的偏移量
我有一个包含多个 Kafka 流的存储库。我想将其中一个流提取到它自己的存储库中。但是,我不确定如何处理该流的消费者群体。这就是我的意思:在新的存储库中,流将具有不同的 application.id
。据我了解,消费者组的名称是根据 application.id
设置的。如果我简单地关闭旧流,对于每个主题的每个分区,新流将从第零个偏移量开始,而不是从旧流停止的偏移量开始。这将导致输出主题中出现重复消息。
是否有一些关于如何处理此问题的通用 rule/best 实践?我需要:
- 关闭旧流;
- 检查每个主题的每个分区的旧流消费者组的偏移量;
- "tell" 新流从各自的偏移量开始。我怎么"tell" :) 呢?我的意思是 - 如果我使用
kafka-console-consumer
,则有partition
和offset
选项,但是流在幕后启动自己的消费者,所以我不确定如何控制它。
流使用了相当多的输入主题(大约 20 个,幸运的是每个主题都有一个分区),所以我不确定如何处理这个问题。
(0) 如果可能,我建议您尝试保留 application.id
,这样您的所有问题都会消失。对于这种情况,您可以先停止旧应用程序然后启动新应用程序,或者甚至先启动所有新实例然后拆除所有旧实例。由于新旧应用程序都连接到同一个 Kafka 集群,因此将实现无缝切换。
(1) 只有你的 Kafka Streams 应用程序是无状态的,你想做的才有效;对于有状态应用程序,需要更多步骤才能将状态转移到新的应用程序实例。或者新实例将从空状态开始(也可以,具体取决于您的应用程序要求)。
(2) 您首先需要停止所有旧应用程序实例并接收旧 application.id
每个输入主题分区的最后提交偏移量(通过 bin/kafka-consumer-groups.sh
使用 --describe --group
选项)之后,您需要使用新的 application.id
提交这些偏移量(同样,您可以使用选项 --to-offset
来使用 bin/kafka-consumer-groups.sh
)。 (有关详细信息,请参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling)之后,您可以启动新的应用程序实例,该实例将为新的 application.id
.