有什么方法可以确保所有 CheckpointListeners 都在使用保存点取消作业时收到有关 Flink 上检查点完成的通知?
Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?
我正在使用 flink 1.9 和 REST API /jobs/:jobid/savepoints
来触发保存点并取消作业(稍后从保存点优雅地停止作业到 运行)。
我在源代码函数中使用了两阶段提交,因此我的源代码实现了 CheckpointedFunction
和 CheckpointListener
接口。在 snapshotState()
方法调用上,我对内部状态进行快照,在 notifyCheckpointComplete()
上,我将状态检查点发送到第 3 方系统。
据我从源代码中看到的,只有 snapshotState()
部分在 CheckpointCoordinator
-
中是同步的
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点确认和完成通知在 AsyncCheckpointRunnable
中是异步的。
也就是说,当 cancel-job
设置为 true
的 savepoint
被触发时,在拍摄快照后,一些任务管理器会在之前保持接收完成通知作业取消并执行 notifyCheckpointComplete()
,有些则不会。
问题是是否有一种方法可以使用保存点取消作业,从而保证 notifyCheckpointComplete()
在作业取消之前被所有任务管理器调用,或者目前没有办法实现这一点?
好久没看 Flink 1.9 了,请慎重接受我的回答。
我猜你的消息来源取消得太早了。所以 notifyCheckpointComplete
实际上被发送到所有任务,但是一些 SourceFunction
已经退出 run
并且相应的任务被清理。
Afaik,如果您在收到最后一个 notifyCheckpointComplete
.
之前忽略取消和中断,那么您所描述的应该是可能的
class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
private volatile boolean canceled = false;
private volatile boolean pendingCheckpoint = false;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
pendingCheckpoint = true;
// start two-phase commit
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// finish two-phase commit
pendingCheckpoint = false;
}
@Override
public void run(SourceContext<Object> ctx) throws Exception {
while (!canceled) {
// do normal source stuff
}
// keep the task running after cancellation
while (pendingCheckpoint) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore interruptions until two-phase commit is done
}
}
}
@Override
public void cancel() {
canceled = true;
}
}
使用 stop-with-savepoint[1][2] 不能解决问题吗?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
我正在使用 flink 1.9 和 REST API /jobs/:jobid/savepoints
来触发保存点并取消作业(稍后从保存点优雅地停止作业到 运行)。
我在源代码函数中使用了两阶段提交,因此我的源代码实现了 CheckpointedFunction
和 CheckpointListener
接口。在 snapshotState()
方法调用上,我对内部状态进行快照,在 notifyCheckpointComplete()
上,我将状态检查点发送到第 3 方系统。
据我从源代码中看到的,只有 snapshotState()
部分在 CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点确认和完成通知在 AsyncCheckpointRunnable
中是异步的。
也就是说,当 cancel-job
设置为 true
的 savepoint
被触发时,在拍摄快照后,一些任务管理器会在之前保持接收完成通知作业取消并执行 notifyCheckpointComplete()
,有些则不会。
问题是是否有一种方法可以使用保存点取消作业,从而保证 notifyCheckpointComplete()
在作业取消之前被所有任务管理器调用,或者目前没有办法实现这一点?
好久没看 Flink 1.9 了,请慎重接受我的回答。
我猜你的消息来源取消得太早了。所以 notifyCheckpointComplete
实际上被发送到所有任务,但是一些 SourceFunction
已经退出 run
并且相应的任务被清理。
Afaik,如果您在收到最后一个 notifyCheckpointComplete
.
class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
private volatile boolean canceled = false;
private volatile boolean pendingCheckpoint = false;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
pendingCheckpoint = true;
// start two-phase commit
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// finish two-phase commit
pendingCheckpoint = false;
}
@Override
public void run(SourceContext<Object> ctx) throws Exception {
while (!canceled) {
// do normal source stuff
}
// keep the task running after cancellation
while (pendingCheckpoint) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore interruptions until two-phase commit is done
}
}
}
@Override
public void cancel() {
canceled = true;
}
}
使用 stop-with-savepoint[1][2] 不能解决问题吗?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html