为什么 Standalone HA Flink 集群不将检查点保存到“state.backend.fs.checkpointdir”目录?
Why Standalone HA Flink cluster doesn't save checkpoints to `state.backend.fs.checkpointdir` directory?
我有 运行 独立的 HA Flink 集群,它每分钟都会为我的流程创建检查点,但我没有在 state.backend.fs.checkpointdir
目录中看到它们。
flink-conf.yaml
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example_staging/flink
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
如您所见,检查点应保存到 s3a://example-staging-flink/checkpoints
目录,但我没有看到它们:
~ s3cmd ls s3://example-staging-flink/
DIR s3://example-staging-flink/recovery/
~ s3cmd ls s3://example-staging-flink/recovery/
DIR s3://example-staging-flink/recovery/blob/
2016-04-15 10:33 1137280 s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02
2016-04-15 01:23 506961 s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa
2016-04-15 09:39 149987 s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6
~ s3cmd ls s3://example-staging-flink/recovery/blob/
DIR s3://example-staging-flink/recovery/blob/cache/
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/
2016-04-14 13:00 3023995 s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b
2016-04-15 09:39 3066784 s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9
2016-04-14 12:54 3023898 s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a
2016-04-14 12:29 3025864 s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05
JobManager 日志有以下日志:
2016-04-21 12:34:55,684 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory - Using job manager savepoint state backend.
2016-04-25 01:13:14,569 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'.
2016-04-25 01:13:14,581 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator - Create CheckpointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator - Create SavepointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2016-04-25 01:13:14,594 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper.
2016-04-25 01:13:14,875 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints.
2016-04-25 01:18:15,247 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1016 @ 1461547095238
2016-04-25 01:18:18,955 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1016 (in 153 ms)
2016-04-25 01:23:15,242 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1017 @ 1461547395238
2016-04-25 01:23:17,357 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1017 (in 138 ms)
2016-04-25 01:28:15,244 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1018 @ 1461547695239
2016-04-25 01:28:18,300 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1018 (in 101 ms)
那么,谁能解释一下为什么 Apache Flink 的独立 HA 集群不将检查点保存到存储中?
如果 Flink 小于给定阈值,则不会将实际状态存储到文件中。默认阈值(可通过 state.backend.fs.memory-threshold
调整)为 1024 字节。低于此阈值,状态与检查点元数据一起存储。
这个阈值背后的想法是,在使用分布式文件系统时,将小状态写入磁盘的成本相对较高。无论如何都要写元数据,只是多存储一点数据。
设置 state.backend.fs.memory-threshold: 0
应该始终将状态写入您的检查点目录,无论其大小如何。
我在任务管理器日志中发现了以下日志消息:
2016-05-06 10:08:30,591 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined state backend: MemoryStateBackend (data in heap memory / checkpoints to JobManager)
已创建 here。
所以,我只是忘记从我的代码中删除以下行:
env.setStateBackend(new MemoryStateBackend());
当我删除它并重新部署我的流时,Flink 开始将检查点写入 state.backend.fs.checkpointdir
参数中指定的文件系统目录。
我有 运行 独立的 HA Flink 集群,它每分钟都会为我的流程创建检查点,但我没有在 state.backend.fs.checkpointdir
目录中看到它们。
flink-conf.yaml
jobmanager.heap.mb: 1024
jobmanager.web.port: 8081
taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager
blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage
parallelism.default: 4
state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s
recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example_staging/flink
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
recovery.jobmanager.port: 6123
fs.hdfs.hadoopconf: /flink/conf
如您所见,检查点应保存到 s3a://example-staging-flink/checkpoints
目录,但我没有看到它们:
~ s3cmd ls s3://example-staging-flink/
DIR s3://example-staging-flink/recovery/
~ s3cmd ls s3://example-staging-flink/recovery/
DIR s3://example-staging-flink/recovery/blob/
2016-04-15 10:33 1137280 s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02
2016-04-15 01:23 506961 s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa
2016-04-15 09:39 149987 s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6
~ s3cmd ls s3://example-staging-flink/recovery/blob/
DIR s3://example-staging-flink/recovery/blob/cache/
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/
2016-04-14 13:00 3023995 s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b
2016-04-15 09:39 3066784 s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9
2016-04-14 12:54 3023898 s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a
2016-04-14 12:29 3025864 s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05
JobManager 日志有以下日志:
2016-04-21 12:34:55,684 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory - Using job manager savepoint state backend.
2016-04-25 01:13:14,569 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'.
2016-04-25 01:13:14,581 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator - Create CheckpointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator - Create SavepointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2016-04-25 01:13:14,594 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper.
2016-04-25 01:13:14,875 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints.
2016-04-25 01:18:15,247 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1016 @ 1461547095238
2016-04-25 01:18:18,955 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1016 (in 153 ms)
2016-04-25 01:23:15,242 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1017 @ 1461547395238
2016-04-25 01:23:17,357 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1017 (in 138 ms)
2016-04-25 01:28:15,244 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1018 @ 1461547695239
2016-04-25 01:28:18,300 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1018 (in 101 ms)
那么,谁能解释一下为什么 Apache Flink 的独立 HA 集群不将检查点保存到存储中?
如果 Flink 小于给定阈值,则不会将实际状态存储到文件中。默认阈值(可通过 state.backend.fs.memory-threshold
调整)为 1024 字节。低于此阈值,状态与检查点元数据一起存储。
这个阈值背后的想法是,在使用分布式文件系统时,将小状态写入磁盘的成本相对较高。无论如何都要写元数据,只是多存储一点数据。
设置 state.backend.fs.memory-threshold: 0
应该始终将状态写入您的检查点目录,无论其大小如何。
我在任务管理器日志中发现了以下日志消息:
2016-05-06 10:08:30,591 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using user-defined state backend: MemoryStateBackend (data in heap memory / checkpoints to JobManager)
已创建 here。
所以,我只是忘记从我的代码中删除以下行:
env.setStateBackend(new MemoryStateBackend());
当我删除它并重新部署我的流时,Flink 开始将检查点写入 state.backend.fs.checkpointdir
参数中指定的文件系统目录。