Apache Flink - 流式应用程序在停止和启动后不会从检查点启动
Apache Flink - streaming app doesn't start from checkpoint after stop and start
我在本地有以下 Flink 流应用程序 运行ning,用 SQL API:
编写
object StreamingKafkaJsonsToCsvLocalFs {
val brokers = "localhost:9092"
val topic = "test-topic"
val consumerGroupId = "test-consumer"
val kafkaTableName = "KafKaTable"
val targetTable = "TargetCsv"
val targetPath = f"file://${new java.io.File(".").getCanonicalPath}/kafka-to-fs-csv"
def generateKafkaTableDDL(): String = {
s"""
|CREATE TABLE $kafkaTableName (
| `kafka_offset` BIGINT METADATA FROM 'offset',
| `seller_id` STRING
|) WITH (
| 'connector' = 'kafka',
| 'topic' = '$topic',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'properties.group.id' = '$consumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
|)
|""".stripMargin
}
def generateTargetTableDDL(): String = {
s"""
|CREATE TABLE $targetTable (
| `kafka_offset` BIGINT,
| `seller_id` STRING
| )
|WITH (
| 'connector' = 'filesystem',
| 'path' = '$targetPath',
| 'format' = 'csv',
| 'sink.rolling-policy.rollover-interval' = '10 seconds',
| 'sink.rolling-policy.check-interval' = '1 seconds'
|)
|""".stripMargin
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
val tblEnv = StreamTableEnvironment.create(env, settings)
tblEnv.executeSql(generateKafkaTableDDL())
tblEnv.executeSql(generateTargetTableDDL())
tblEnv.from(kafkaTableName).executeInsert(targetTable).await()
tblEnv.executeSql("kafka-json-to-fs")
}
}
如您所见,检查点已启用,当我执行此应用程序时,我看到检查点文件夹已创建并填充。
我面临的问题是——当我停止和启动我的应用程序时(从 IDE),我希望它从上次执行时停止的同一点开始,但我却看到它消耗主题中最早偏移量的所有偏移量(我从包含零偏移量的新生成的输出文件中看到它,尽管之前的 运行 处理了这些偏移量)。
关于 Flink 中的检查点,我错过了什么?我希望它正好是一次。
Flink 仅在从故障中恢复时从检查点重新启动,或者通过 the command line or REST API 从保留的检查点显式重新启动时。否则,KafkaSource从代码中配置的偏移量开始,默认为最早的偏移量。
如果您没有其他状态,则可以改为依赖提交的偏移量作为真实来源,并将 Kafka 连接器配置为使用提交的偏移量作为起始位置。
Flink 通过检查点实现的容错并非旨在支持 mini-cluster 部署,例如在 IDE 中 运行 时使用的部署。通常作业管理器和任务管理器运行在不同的进程中,作业管理器可以检测到任务管理器发生故障,并可以安排重新启动。
我在本地有以下 Flink 流应用程序 运行ning,用 SQL API:
编写object StreamingKafkaJsonsToCsvLocalFs {
val brokers = "localhost:9092"
val topic = "test-topic"
val consumerGroupId = "test-consumer"
val kafkaTableName = "KafKaTable"
val targetTable = "TargetCsv"
val targetPath = f"file://${new java.io.File(".").getCanonicalPath}/kafka-to-fs-csv"
def generateKafkaTableDDL(): String = {
s"""
|CREATE TABLE $kafkaTableName (
| `kafka_offset` BIGINT METADATA FROM 'offset',
| `seller_id` STRING
|) WITH (
| 'connector' = 'kafka',
| 'topic' = '$topic',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'properties.group.id' = '$consumerGroupId',
| 'scan.startup.mode' = 'earliest-offset',
| 'format' = 'json'
|)
|""".stripMargin
}
def generateTargetTableDDL(): String = {
s"""
|CREATE TABLE $targetTable (
| `kafka_offset` BIGINT,
| `seller_id` STRING
| )
|WITH (
| 'connector' = 'filesystem',
| 'path' = '$targetPath',
| 'format' = 'csv',
| 'sink.rolling-policy.rollover-interval' = '10 seconds',
| 'sink.rolling-policy.check-interval' = '1 seconds'
|)
|""".stripMargin
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build()
val tblEnv = StreamTableEnvironment.create(env, settings)
tblEnv.executeSql(generateKafkaTableDDL())
tblEnv.executeSql(generateTargetTableDDL())
tblEnv.from(kafkaTableName).executeInsert(targetTable).await()
tblEnv.executeSql("kafka-json-to-fs")
}
}
如您所见,检查点已启用,当我执行此应用程序时,我看到检查点文件夹已创建并填充。
我面临的问题是——当我停止和启动我的应用程序时(从 IDE),我希望它从上次执行时停止的同一点开始,但我却看到它消耗主题中最早偏移量的所有偏移量(我从包含零偏移量的新生成的输出文件中看到它,尽管之前的 运行 处理了这些偏移量)。
关于 Flink 中的检查点,我错过了什么?我希望它正好是一次。
Flink 仅在从故障中恢复时从检查点重新启动,或者通过 the command line or REST API 从保留的检查点显式重新启动时。否则,KafkaSource从代码中配置的偏移量开始,默认为最早的偏移量。
如果您没有其他状态,则可以改为依赖提交的偏移量作为真实来源,并将 Kafka 连接器配置为使用提交的偏移量作为起始位置。
Flink 通过检查点实现的容错并非旨在支持 mini-cluster 部署,例如在 IDE 中 运行 时使用的部署。通常作业管理器和任务管理器运行在不同的进程中,作业管理器可以检测到任务管理器发生故障,并可以安排重新启动。