使用tableAPI时Flink未能触发检查点
Flink failed to trigger checkpoint when using table API
我的 flink 流应用程序 (v1.14.4) 包含 JDBC 用于从 MySQL 服务器初始获取数据的连接器
逻辑:
- JDBC table 源 -> select.where() -> 转换为数据流
- Kafka 数据流加入 jdbc table -> 进一步计算
当我在本地 运行 应用程序时,我可以看到以下异常
14:52:00.401 [Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0] INFO o.a.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0 (e8870cf296ac770346384fe2529b325f) switched from RUNNING to FINISHED.
...
14:57:52.963 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Failed to trigger checkpoint for job 8303c423dd7b9e3f303f0b299d7d37bb because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
我明白在 SQL select 语句 flink 标记 jdbc 运算符到 FINISHED 状态之后,但我需要继续 运行 流应用程序并在期间有检查点运行时间
我是否需要使用 execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
属性 来解决我的问题,或者我是否需要更改我的应用程序图?
是的,你需要使用
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
这样尽管 jdbc 源任务已经完成,作业仍可以检查点。
我的 flink 流应用程序 (v1.14.4) 包含 JDBC 用于从 MySQL 服务器初始获取数据的连接器 逻辑:
- JDBC table 源 -> select.where() -> 转换为数据流
- Kafka 数据流加入 jdbc table -> 进一步计算
当我在本地 运行 应用程序时,我可以看到以下异常
14:52:00.401 [Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0] INFO o.a.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0 (e8870cf296ac770346384fe2529b325f) switched from RUNNING to FINISHED.
...
14:57:52.963 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Failed to trigger checkpoint for job 8303c423dd7b9e3f303f0b299d7d37bb because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.
我明白在 SQL select 语句 flink 标记 jdbc 运算符到 FINISHED 状态之后,但我需要继续 运行 流应用程序并在期间有检查点运行时间
我是否需要使用 execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
属性 来解决我的问题,或者我是否需要更改我的应用程序图?
是的,你需要使用
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
这样尽管 jdbc 源任务已经完成,作业仍可以检查点。