Apache Flink - 流式应用程序在停止和启动后不会从检查点启动

Apache Flink - streaming app doesn't start from checkpoint after stop and start

我在本地有以下 Flink 流应用程序 运行ning,用 SQL API:

编写
object StreamingKafkaJsonsToCsvLocalFs {

  val brokers = "localhost:9092"
  val topic = "test-topic"
  val consumerGroupId = "test-consumer"
  val kafkaTableName = "KafKaTable"
  val targetTable = "TargetCsv"
  val targetPath = f"file://${new java.io.File(".").getCanonicalPath}/kafka-to-fs-csv"

  def generateKafkaTableDDL(): String = {
    s"""
       |CREATE TABLE $kafkaTableName (
       |  `kafka_offset` BIGINT METADATA FROM 'offset',
       |  `seller_id` STRING
       |) WITH (
       |  'connector' = 'kafka',
       |  'topic' = '$topic',
       |  'properties.bootstrap.servers' = 'localhost:9092',
       |  'properties.group.id' = '$consumerGroupId',
       |  'scan.startup.mode' = 'earliest-offset',
       |  'format' = 'json'
       |)
       |""".stripMargin
  }

  def generateTargetTableDDL(): String = {
    s"""
       |CREATE TABLE $targetTable (
       |  `kafka_offset` BIGINT,
       |  `seller_id` STRING
       |  )
       |WITH (
       |  'connector' = 'filesystem',
       |  'path' = '$targetPath',
       |  'format' = 'csv',
       |  'sink.rolling-policy.rollover-interval' = '10 seconds',
       |  'sink.rolling-policy.check-interval' = '1 seconds'
       |)
       |""".stripMargin
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")

    val settings = EnvironmentSettings.newInstance()
      .inStreamingMode()
      .build()

    val tblEnv = StreamTableEnvironment.create(env, settings)

    tblEnv.executeSql(generateKafkaTableDDL())
    tblEnv.executeSql(generateTargetTableDDL())

    tblEnv.from(kafkaTableName).executeInsert(targetTable).await()
    tblEnv.executeSql("kafka-json-to-fs")
  }
}

如您所见,检查点已启用,当我执行此应用程序时,我看到检查点文件夹已创建并填充。

我面临的问题是——当我停止和启动我的应用程序时(从 IDE),我希望它从上次执行时停止的同一点开始,但我却看到它消耗主题中最早偏移量的所有偏移量(我从包含零偏移量的新生成的输出文件中看到它,尽管之前的 运行 处理了这些偏移量)。

关于 Flink 中的检查点,我错过了什么?我希望它正好是一次。

Flink 仅在从故障中恢复时从检查点重新启动,或者通过 the command line or REST API 从保留的检查点显式重新启动时。否则,KafkaSource从代码中配置的偏移量开始,默认为最早的偏移量。

如果您没有其他状态,则可以改为依赖提交的偏移量作为真实来源,并将 Kafka 连接器配置为使用提交的偏移量作为起始位置。


Flink 通过检查点实现的容错并非旨在支持 mini-cluster 部署,例如在 IDE 中 运行 时使用的部署。通常作业管理器和任务管理器运行在不同的进程中,作业管理器可以检测到任务管理器发生故障,并可以安排重新启动。