从 Kafka Store Changelog 读取时出现 OffsetOutOfRangeException

OffsetOutOfRangeException when reading from Kafka Store Changelog

我有一个 Kafka Streams 应用程序正在从商店更新日志中读取,偶尔会抛出此错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

我认为消费者应该默认 latest。即使我尝试使用 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 将我的流属性配置为 latestearliest,我仍然会看到此错误。为什么?

配置消费者重置策略只能用于读取实际输入的主题。

对于 changelog 主题(即恢复案例),重置策略在内部始终设置为 none,因为 Kafka Streams 需要手动处理这种情况。异常被捕获并记录为 WARN 级别的消息。之后,Kafka Streams 会进行一些内部清理和手动 #seekToBeginning() 以干净地重新启动恢复过程。

没有理由担心这个。但是,会记录一条 WARN 消息以通知您该事件。