Flink 从不调用 notifyCheckpointComplete
notifyCheckpointComplete is never called in Flink
我写了自己的运算符,扩展自 AbstractStreamOperator 和 OneInputStreamOperator
implements OneInputStreamOperator<GenericRecord, Void>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
...
}
然后我转换我的 DataStream
OneOperator oneOperator = new OneOperator();
input.transform(oneOperator.getClass().getSimpleName(), Types.VOID, oneOperator)
.setParallelism(1)
.setMaxParallelism(1)
.addSink(new DiscardingSink<>())
.setParallelism(1)
.uid("oneOperator");
这样我就可以使用 oneOperator 做一些事情了。
在我的单元测试中,我将检查点设置为500ms,将autowatermarkInterval设置为10,然后将2个事件传递给输入,2个事件之间的时间戳相隔2小时,足以触发检查点。
因此,据我了解,一旦触发检查点,就会调用 notifyCheckpointComplete。
但是当我 运行 我的单元测试时,从未调用 notifyCheckpointComplete 函数。
我错过了什么吗?
谢谢。
检查点与水印无关(大多数时候):) 因此,当您将检查点设置为 500 毫秒时,处理时间将是 500 毫秒,因此您注入的数据并不重要。如果测试很快完成,可能不会触发检查点,因此您可能需要减少检查点时间或增加一些睡眠以等待检查点被触发。
我写了自己的运算符,扩展自 AbstractStreamOperator 和 OneInputStreamOperator
implements OneInputStreamOperator<GenericRecord, Void>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
...
}
然后我转换我的 DataStream
OneOperator oneOperator = new OneOperator();
input.transform(oneOperator.getClass().getSimpleName(), Types.VOID, oneOperator)
.setParallelism(1)
.setMaxParallelism(1)
.addSink(new DiscardingSink<>())
.setParallelism(1)
.uid("oneOperator");
这样我就可以使用 oneOperator 做一些事情了。
在我的单元测试中,我将检查点设置为500ms,将autowatermarkInterval设置为10,然后将2个事件传递给输入,2个事件之间的时间戳相隔2小时,足以触发检查点。
因此,据我了解,一旦触发检查点,就会调用 notifyCheckpointComplete。
但是当我 运行 我的单元测试时,从未调用 notifyCheckpointComplete 函数。
我错过了什么吗?
谢谢。
检查点与水印无关(大多数时候):) 因此,当您将检查点设置为 500 毫秒时,处理时间将是 500 毫秒,因此您注入的数据并不重要。如果测试很快完成,可能不会触发检查点,因此您可能需要减少检查点时间或增加一些睡眠以等待检查点被触发。