flink 在 k8s 上缺少状态值 - 在 jobmanager/taskmanager 崩溃时恢复作业
flink missing state value on k8s - Resume job while jobmanager/taskmanager crashes
在kubernetes上的flink job cluster(deployment/pod) 运行,我们删除了jobmanager和taskmanager(kubectl delete pod XXX)。我们发现之前的 pod 中缺少状态,在 pod 为 运行 并且工作正常后,rocksDB 和检查点文件路径从 PVC 挂载。
在 pod 为 运行 后有什么恢复状态的建议吗?我仔细检查了代码。我发现检查点未启用。这是工作无法恢复的根本原因吗?
环境设置如下
RocksDBStateBackend backend = new RocksDBStateBackend(checkPointDataUri + "/checkpoint",true);
backend.setDbStoragePath(checkPointDataUri + "/RocksDB");
backend.setNumberOfTransferingThreads(1);
// add state backend
env.setStateBackend((StateBackend)backend);
我们可以像下面那样启用检查点吗?
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
下面是重启日志
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,961 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,981 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,944 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
RocksDB和checkpoints放在同一个文件系统中没有意义。 RocksDB 应该使用最快的可用本地文件系统——kubernetes 临时存储就可以了。检查点必须存储在某种持久的分布式文件系统中。
在kubernetes上的flink job cluster(deployment/pod) 运行,我们删除了jobmanager和taskmanager(kubectl delete pod XXX)。我们发现之前的 pod 中缺少状态,在 pod 为 运行 并且工作正常后,rocksDB 和检查点文件路径从 PVC 挂载。 在 pod 为 运行 后有什么恢复状态的建议吗?我仔细检查了代码。我发现检查点未启用。这是工作无法恢复的根本原因吗?
环境设置如下
RocksDBStateBackend backend = new RocksDBStateBackend(checkPointDataUri + "/checkpoint",true);
backend.setDbStoragePath(checkPointDataUri + "/RocksDB");
backend.setNumberOfTransferingThreads(1);
// add state backend
env.setStateBackend((StateBackend)backend);
我们可以像下面那样启用检查点吗?
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
下面是重启日志
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,961 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,981 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,944 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
RocksDB和checkpoints放在同一个文件系统中没有意义。 RocksDB 应该使用最快的可用本地文件系统——kubernetes 临时存储就可以了。检查点必须存储在某种持久的分布式文件系统中。