从 Kafka Store Changelog 读取时出现 OffsetOutOfRangeException
OffsetOutOfRangeException when reading from Kafka Store Changelog
我有一个 Kafka Streams 应用程序正在从商店更新日志中读取,偶尔会抛出此错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
我认为消费者应该默认 latest
。即使我尝试使用 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
将我的流属性配置为 latest
或 earliest
,我仍然会看到此错误。为什么?
配置消费者重置策略只能用于读取实际输入的主题。
对于 changelog 主题(即恢复案例),重置策略在内部始终设置为 none
,因为 Kafka Streams 需要手动处理这种情况。异常被捕获并记录为 WARN 级别的消息。之后,Kafka Streams 会进行一些内部清理和手动 #seekToBeginning()
以干净地重新启动恢复过程。
没有理由担心这个。但是,会记录一条 WARN 消息以通知您该事件。
我有一个 Kafka Streams 应用程序正在从商店更新日志中读取,偶尔会抛出此错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
我认为消费者应该默认 latest
。即使我尝试使用 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
将我的流属性配置为 latest
或 earliest
,我仍然会看到此错误。为什么?
配置消费者重置策略只能用于读取实际输入的主题。
对于 changelog 主题(即恢复案例),重置策略在内部始终设置为 none
,因为 Kafka Streams 需要手动处理这种情况。异常被捕获并记录为 WARN 级别的消息。之后,Kafka Streams 会进行一些内部清理和手动 #seekToBeginning()
以干净地重新启动恢复过程。
没有理由担心这个。但是,会记录一条 WARN 消息以通知您该事件。