FlinkKafkaConsumer 从话题开始阅读

FlinkKafkaConsumer read from the start of the topic

我正在尝试将 kafka 主题读取为 Flink 中的数据流。我正在使用 FlinkKafkaConsumer 阅读主题。

我面临的问题是,在进行了几次测试之后,我想从主题的开头重新开始阅读以进行一些额外的测试。理想情况下,更改 group.id 并重新启动作业都应该足以完成此操作。

但重启后,消费者仍然可以找到旧的checkpoints/kafka.commit。我还尝试删除所有检查点,删除所有 configMaps 和部署并重新启动所有内容,但同样的事情再次发生。我可以在任务管理器日志中看到设置的偏移量。

如何重新从题目开始阅读?

2021-02-17 10:08:41,287 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Discovered group coordinator idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 2147483647 rack: null)
2021-02-17 10:08:41,324 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-0 to the committed offset FetchPosition{offset=40204, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,326 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-1 to the committed offset FetchPosition{offset=39962, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-4 to the committed offset FetchPosition{offset=40444, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-2 to the committed offset FetchPosition{offset=40423, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-3 to the committed offset FetchPosition{offset=40368, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}

如果作业不是从检查点恢复,或者不是从保存点启动,这应该可以解决问题:

FlinkKafkaConsumer<T> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();

否则默认从提交的组偏移量开始。

听起来您已经看过这个解释 in the docs,但以防万一:

Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer).

我认为问题不在于消费者能够找到旧的提交或旧的检查点,只要您从头开始工作而不是从保存点开始。

问题似乎是您没有在 Kafka Consumer 上设置 auto.offset.reset,这意味着使用默认值,即 latest。因此,每当您使用 new group.id 开始一项工作时,它总是会从提交给 Kafka 的最新偏移量开始。您可以通过简单地将 auto.offset.reset 属性 设置为 earliest 传递给 KafkaConsumer 的属性来更改它。