Flink 中的检查点不适用于 CoFlatMapFunction
Checkpointing in Flink is not working with CoFlatMapFunction
您好,我正在尝试在我使用 CoFlatMapFunction 的其中一个 flink 模块中执行检查点操作,如果我注释掉 CoFlatMapFunction 检查点功能,如果再次取消注释,则检查点功能不起作用。我在 flink 网站上将 Checkpointing 更新为 documentation,其中它说对于迭代流,添加了一个额外的属性以在执行此操作后强制执行检查点事件,但它也不起作用请在下面找到检查点设置
StateBackend stateBackend = new RocksDBStateBackend(path, true);
//env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setStateBackend(stateBackend);
我可以看到一个任务状态已完成,但我无法看到日志,因为
我相信这样做的原因是 FLINK-2491:检查点仅在所有 operators/tasks 仍为 运行 时才有效。
您应该将从集合中注入一些数据的源替换为不会立即转换为完成的其他源,也许是一个自定义源一旦用完要发出的数据但什么都不做,source alive。
您好,我正在尝试在我使用 CoFlatMapFunction 的其中一个 flink 模块中执行检查点操作,如果我注释掉 CoFlatMapFunction 检查点功能,如果再次取消注释,则检查点功能不起作用。我在 flink 网站上将 Checkpointing 更新为 documentation,其中它说对于迭代流,添加了一个额外的属性以在执行此操作后强制执行检查点事件,但它也不起作用请在下面找到检查点设置
StateBackend stateBackend = new RocksDBStateBackend(path, true);
//env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setStateBackend(stateBackend);
我可以看到一个任务状态已完成,但我无法看到日志,因为
我相信这样做的原因是 FLINK-2491:检查点仅在所有 operators/tasks 仍为 运行 时才有效。
您应该将从集合中注入一些数据的源替换为不会立即转换为完成的其他源,也许是一个自定义源一旦用完要发出的数据但什么都不做,source alive。