有什么方法可以确保所有 CheckpointListeners 都在使用保存点取消作业时收到有关 Flink 上检查点完成的通知?

Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?

我正在使用 flink 1.9 和 REST API /jobs/:jobid/savepoints 来触发保存点并取消作业(稍后从保存点优雅地停止作业到 运行)。

我在源代码函数中使用了两阶段提交,因此我的源代码实现了 CheckpointedFunctionCheckpointListener 接口。在 snapshotState() 方法调用上,我对内部状态进行快照,在 notifyCheckpointComplete() 上,我将状态检查点发送到第 3 方系统。

据我从源代码中看到的,只有 snapshotState() 部分在 CheckpointCoordinator -

中是同步的
// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

检查点确认和完成通知在 AsyncCheckpointRunnable 中是异步的。

也就是说,当 cancel-job 设置为 truesavepoint 被触发时,在拍摄快照后,一些任务管理器会在之前保持接收完成通知作业取消并执行 notifyCheckpointComplete(),有些则不会。

问题是是否有一种方法可以使用保存点取消作业,从而保证 notifyCheckpointComplete() 在作业取消之前被所有任务管理器调用,或者目前没有办法实现这一点?

好久没看 Flink 1.9 了,请慎重接受我的回答。

我猜你的消息来源取消得太早了。所以 notifyCheckpointComplete 实际上被发送到所有任务,但是一些 SourceFunction 已经退出 run 并且相应的任务被清理。

Afaik,如果您在收到最后一个 notifyCheckpointComplete.

之前忽略取消和中断,那么您所描述的应该是可能的
class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}

使用 stop-with-savepoint[1][2] 不能解决问题吗?

[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html