Kafka Stream 程序正在重新处理已经处理过的事件
Kafka Stream program is reprocessing the already processed events
我向 Kafka 转发了一些事件并启动了我的 Kafka 流程序。我的程序开始处理事件并完成。一段时间后,我停止了 Kafka 流应用程序,然后重新开始。观察到My Kafka流程序正在处理之前已经处理过的事件。
据我了解,Kafka 流在内部维护每个应用程序 ID 的输入主题本身的偏移量。但是这里重新处理已经处理过的事件。
如何验证 Kafka 流处理完成了哪个偏移量? Kafka Stream 如何持久化这些书签? Kafka stream 将在什么基础上从哪个 Kafka 偏移量开始读取来自 Kafka 的事件?
如果 Kafka steam 抛出异常,那么它是否重新处理已经处理过的事件?
请解惑。
请帮助我了解更多。
Kafka Streams 内部使用 KafkaConsumer
并且所有 运行 实例使用 application.id
作为 group.id
形成一个消费者组。偏移量会定期(可配置)提交给 Kafka 集群。因此,在使用相同的 application.id
重新启动时,Kafka Streams 应该获取最新提交的偏移量并从那里继续处理。
您可以使用 bin/kafka-consumer-groups.sh
工具检查任何其他消费者组的提交偏移量。
我向 Kafka 转发了一些事件并启动了我的 Kafka 流程序。我的程序开始处理事件并完成。一段时间后,我停止了 Kafka 流应用程序,然后重新开始。观察到My Kafka流程序正在处理之前已经处理过的事件。
据我了解,Kafka 流在内部维护每个应用程序 ID 的输入主题本身的偏移量。但是这里重新处理已经处理过的事件。
如何验证 Kafka 流处理完成了哪个偏移量? Kafka Stream 如何持久化这些书签? Kafka stream 将在什么基础上从哪个 Kafka 偏移量开始读取来自 Kafka 的事件?
如果 Kafka steam 抛出异常,那么它是否重新处理已经处理过的事件?
请解惑。
请帮助我了解更多。
Kafka Streams 内部使用 KafkaConsumer
并且所有 运行 实例使用 application.id
作为 group.id
形成一个消费者组。偏移量会定期(可配置)提交给 Kafka 集群。因此,在使用相同的 application.id
重新启动时,Kafka Streams 应该获取最新提交的偏移量并从那里继续处理。
您可以使用 bin/kafka-consumer-groups.sh
工具检查任何其他消费者组的提交偏移量。