为什么 Standalone HA Flink 集群不将检查点保存到“state.backend.fs.checkpointdir”目录?

Why Standalone HA Flink cluster doesn't save checkpoints to `state.backend.fs.checkpointdir` directory?

我有 运行 独立的 HA Flink 集群,它每分钟都会为我的流程创建检查点,但我没有在 state.backend.fs.checkpointdir 目录中看到它们。

flink-conf.yaml

jobmanager.heap.mb: 1024
jobmanager.web.port: 8081

taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager

blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage

parallelism.default: 4

state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints

restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s

recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example_staging/flink
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
recovery.jobmanager.port: 6123

fs.hdfs.hadoopconf: /flink/conf

如您所见,检查点应保存到 s3a://example-staging-flink/checkpoints 目录,但我没有看到它们:

~ s3cmd ls s3://example-staging-flink/
                       DIR   s3://example-staging-flink/recovery/
~ s3cmd ls s3://example-staging-flink/recovery/
                       DIR   s3://example-staging-flink/recovery/blob/
2016-04-15 10:33   1137280   s3://example-staging-flink/recovery/completedCheckpoint6eab84c79b02
2016-04-15 01:23    506961   s3://example-staging-flink/recovery/completedCheckpoint9e8f3d1254aa
2016-04-15 09:39    149987   s3://example-staging-flink/recovery/submittedJobGraph0bf82ada1dc6
~ s3cmd ls s3://example-staging-flink/recovery/blob/
                       DIR   s3://example-staging-flink/recovery/blob/cache/
~ s3cmd ls s3://example-staging-flink/recovery/blob/cache/
2016-04-14 13:00   3023995   s3://example-staging-flink/recovery/blob/cache/blob_0b6e57360c05128b3c91d75341785df64b91217b
2016-04-15 09:39   3066784   s3://example-staging-flink/recovery/blob/cache/blob_3ef7422ce7b5e5cbf1f031b0de1561159109d7f9
2016-04-14 12:54   3023898   s3://example-staging-flink/recovery/blob/cache/blob_5062028a8cab14daaeb19e51f01a02da3a8e515a
2016-04-14 12:29   3025864   s3://example-staging-flink/recovery/blob/cache/blob_7809e559953291cab482e9cf3324457ad07d6d05

JobManager 日志有以下日志:

2016-04-21 12:34:55,684 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job manager savepoint state backend.
2016-04-25 01:13:14,569 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Initialized in '/checkpoints/a5f89242c729190e46baf409768960fb'.
2016-04-25 01:13:14,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator  - Create CheckpointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO  org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator  - Create SavepointCoordinatorDeActivator
2016-04-25 01:13:14,583 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Recovering checkpoints from ZooKeeper.
2016-04-25 01:13:14,594 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Found 1 checkpoints in ZooKeeper.
2016-04-25 01:13:14,875 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - Initialized with Checkpoint 1015 @ 1461546663803 for a5f89242c729190e46baf409768960fb. Removing all older checkpoints.
2016-04-25 01:18:15,247 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1016 @ 1461547095238
2016-04-25 01:18:18,955 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1016 (in 153 ms)
2016-04-25 01:23:15,242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1017 @ 1461547395238
2016-04-25 01:23:17,357 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1017 (in 138 ms)
2016-04-25 01:28:15,244 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1018 @ 1461547695239
2016-04-25 01:28:18,300 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1018 (in 101 ms)

那么,谁能解释一下为什么 Apache Flink 的独立 HA 集群不将检查点保存到存储中?

如果 Flink 小于给定阈值,则不会将实际状态存储到文件中。默认阈值(可通过 state.backend.fs.memory-threshold 调整)为 1024 字节。低于此阈值,状态与检查点元数据一起存储。

这个阈值背后的想法是,在使用分布式文件系统时,将小状态写入磁盘的成本相对较高。无论如何都要写元数据,只是多存储一点数据。

设置 state.backend.fs.memory-threshold: 0 应该始终将状态写入您的检查点目录,无论其大小如何。

我在任务管理器日志中发现了以下日志消息:

2016-05-06 10:08:30,591 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using user-defined state backend: MemoryStateBackend (data in heap memory / checkpoints to JobManager)

已创建 here

所以,我只是忘记从我的代码中删除以下行:

env.setStateBackend(new MemoryStateBackend());

当我删除它并重新部署我的流时,Flink 开始将检查点写入 state.backend.fs.checkpointdir 参数中指定的文件系统目录。