消费者组的 Kafka Stream 偏移量重置为零

Kafka Stream offset reset to zero for consumer group

我编写了 Kafka Streaming 应用程序,它仅根据某些条件过滤行并将其加载到 MongoDB。

流式处理工作正常,但由于我的代码中存在一些缺陷,我想再次重新处理整个数据。

一种方法是终止流式应用,更改消费者组 ID,从 mongo 中删除数据并重新运行应用。

如何在不改变消费者组id的情况下实现这个场景。

<< 我使用的是 Kafka 0.10 版本 >>

非常感谢 帕里

收到来自 Matthias J. Sax matthias@confluent.io -

的更新

目前,更改应用程序 ID 是最好的方法。 正确清理应用程序状态有点棘手。我们 目前正在为此进行改进——应该可用 很快。

https://issues.apache.org/jira/browse/KAFKA-3185

干杯 比利

Apache Kafka 0.10.0.1(于 8 月发布,而最初的问题是在 7 月提出的)附带了一个新的 Kafka Streams 应用程序重置工具,这是一个比简单的 better/cleaner 更简单的解决方案重命名 application.id.

您可以通过脚本 bin/kafka-streams-application-reset.sh 执行该工具,该脚本还将打印一条 usage/help 消息。

示例:

# Run this only after ALL application instances were stopped!
$ bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic \
                                      --bootstrap-servers brokerHost:9092 \
                                      --zookeeper zookeeperHost:2181

也就是说,我建议阅读前面提到的 Matthias J. Sax 撰写的 Data Reprocessing with Kafka Streams: Resetting a Streams Application,以了解更多详细信息。那篇文章还解释了为什么简单地重命名 application.id(这是直到现在的解决方法)不是最好的主意。