如何验证在 Apache flink 中工作的增量检查点?

How to verify incremental checkpoints working in Apache flink?

我在我的 flink 代码中使用 RocksDB 作为状态后端实现增量检查点,但我想知道增量检查点是否正在发生,我的意思是有没有办法检查日志或 flink仪表板是执行增量检查点还是完整检查点

  1. 我根据 flink 文档使用 flink 版本 1.10.0 我看到日志记录机制在 Flink 版本 1.10.0 中被禁用 我遵循了这个 Ververica link 启用 RocksDB 日志 下面是我使用的启用日志记录的代码
    import static org.apache.flink.configuration.ConfigOptions.key;
    
    import java.util.Collection;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
    import org.rocksdb.DBOptions;
    import org.rocksdb.InfoLogLevel;
    
    public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
        private static final long serialVersionUID = 1L;
    
        private String dbLogDir = "";
    
        @Override
        public DBOptions createDBOptions(DBOptions currentOptions,
                                         Collection<AutoCloseable> handlesToClose) {
            currentOptions = super.createDBOptions(currentOptions, handlesToClose);
    
            currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
            currentOptions.setStatsDumpPeriodSec(60);
            currentOptions.setDbLogDir(dbLogDir);
    
            return currentOptions;
        }
    
        @Override
        public String toString() {
            return this.getClass().toString() + "{" + super.toString() + '}';
        }
    
        /**
         * Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
         * data directory's absolute path will be used as the log file prefix).
         */
        public void setDbLogDir(String dbLogDir) {
            this.dbLogDir = dbLogDir;
        }
    
        public static final ConfigOption<String> LOG_DIR =
                key("state.backend.rocksdb.log.dir")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
                                "data directory's absolute path will be used as the log file prefix)");
    
        @Override
        public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
            DefaultConfigurableOptionsFactory optionsFactory =
                    super.configure(configuration);
    
            this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
    
            return optionsFactory;
        }

我在代码中做了以下设置以启用日志记录

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.enableCheckpointing(interval);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
    
    DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
    options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
    
    env.setStateBackend(stateBackend);
    
    stateBackend.setRocksDBOptions(options);

我在我的 flink 配置文件中添加了以下 2 设置以启用 RocksDB 日志记录

    state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
    state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog

我浏览了完整的 flink 仪表板,但我没有得到任何线索来检查是增量检查点正在发生还是完整检查点正在发生。请帮助我如何为 RocksDB 设置日志记录以了解增量检查点是否正在发生。我在文档中看到 RocksDB 日志记录将导致巨大的性能成本以及用于测试目的的存储,之后我将禁用它

我不确定此信息是否记录或显示在任何地方,但您可以在您的代码中使用

stateBackend.isIncrementalCheckpointsEnabled()

确定您的 RocksDB 状态后端是否启用了检查点,然后自己记录此信息。

请注意,要启用增量检查点(默认情况下关闭),您需要配置

state.backend.incremental: true