再次执行 yarn application kill 和 运行 后,flink 会从上次的 offset 恢复吗?

Will flink resume from the last offset after executing yarn application kill and running again?

我使用 FlinkKafkaConsumer 来消费 kafka 并启用检查点。现在我对偏移量管理和检查点机制有点困惑。 我已经知道 flink 会开始读取消费者组的分区。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration 并且偏移量将存储到远程文件系统中的检查点中。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance

如果我通过执行 yarn application -kill appid 停止应用程序会发生什么 和 运行 像 ./bin flink run ... 这样的启动命令? flink 会从 checkpoint 还是从 kafka 管理的 group-id 获取偏移量?

如果您在没有定义保存点的​​情况下再次 运行 作业 ($ bin/flink run -s :savepointPath [:runArgs]),flink 将尝试从 kafka(在旧版本中从 zookeeper)获取您的消费者组的偏移量。但是你会失去你的 flink 作业的所有其他状态(如果你有一个无状态的 flink 作业,这可能是可以忽略的)。

我必须承认这种行为非常令人困惑。默认情况下,在没有保存点的情况下开始作业就像从零开始一样。据我所知,只有 kafka 源代码的实现与该行为不同。如果您想更改该行为,您可以将 FlinkKafkaConsumer[08/09/10]setStartFromGroupOffsets 设置为 false。此处对此进行了描述:Kafka Consumers Start Position Configuration

可能值得仔细看看 flink 的文档:What is a savepoint and how does it differ from checkpoints

一言以蔽之

检查点:

The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink

保存点:

Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume

目前正在讨论如何 "unify" 保存点和检查点。在这里找到很多技术细节:Flink improvals 47: Checkpoints vs Savepoints