KAFKA-STREAM:当偏移量不再存在时,Kafka-stream 卡住了

KAFKA-STREAM : Kafka-stream stucked when offset is no more existing

我也在kafka Jira描述了这个问题: https://issues.apache.org/jira/browse/KAFKA-13014

我们有多个实例和线程的 kafka-stream。

这个 kafka-stream 消耗了很多主题。

其中一个主题分区一天无法访问,主题保留时间为 4 小时。

解决问题后,kafka-stream 正在尝试从不再存在的偏移量中消费:

Kafka-消费者组描述:

我们可以看到 KS 正在等待的当前偏移量是 59754934,但此分区的新第一个偏移量是 264896001。

Kafka-stream不抛异常的问题

这是我看到的唯一日志

08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 信息 o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data- mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, groupId=talaria-data-mixed-prod] 更新分配 with08:44:53.924 [talaria-data-mixed-prod-c3d6ac16- 516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 信息 o.a.k.c.c.i.ConsumerCoordinator - [消费者 clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-消费者,groupId=talaria -data-mixed-prod] 使用已分配分区更新分配: [adm__article_ean_repartition_v3-10、adm__article_itm_repartition_v3-10、adm__article_sign_repartition_v3-10、adm__article_stock_repartition_v3-10] 当前拥有的分区: [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] 添加的分区(分配 - 拥有): [] 撤销的分区(拥有 - 分配): [] 08:44:53.924 [talaria-data-mixed -prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-消费者,groupId=tala ria-data-mixed-prod] 通知分配者有关新的分配(partitions=[adm__article_stock_repartition_v3-10, adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 信息 o.a.k.s.p.i.StreamsPartitionAssignor - 流线程 [talaria-data-mixed-prod-c3d6ac16-516c- 49ee-a34e-bde5f3f629dc-StreamThread-2-consumer] 没有请求后续再平衡,重置再平衡计划。08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread -2] INFO o.a.k.s.p.internals.TaskManager - stream-thread [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] Handle new assignment with: New active tasks: [0_10] 新备任务:[0_17, 0_21] 现有活动任务:[0_10] 现有备任务:[0_17, 0_21]08:44:53.924 [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=talaria-data-mixed-prod-c3d6ac16-516c- 49ee-a34e-bde5f3f629dc-StreamThread-2-消费者,groupId=talaria-data-mixed-pro d] 添加新分配的分区:

PI:代理卡夫卡版本:5.3.4-ccs

问题比我想象的要复杂

这是混合使用 exactly_once 和使用状态存储的问题

当应用程序在不等待流关闭的情况下兑现时,更改日志主题的最新消息的事务被中止,因此当我们重新启动 kafka-stream 时,拓扑在启动之前等待重新加载本地 rocksdb 存储使用消息。

错误就在那里,因为他们使用 consumer-metadate "topic.lastoffset" == curent_consumer_offset

检查

但应该是这样的:

消费者元数据“topic.last_commited_message_and_transaction_offset”==curent_consumer_offset

我通过切换到 at_least_one 修复了该问题,但我认为它已在 2.7.1

上修复