Flink CepOperator

Flink CepOperator

我正在阅读 Flink CEP 的 CepOperator 源代码,对以下代码片段有疑问:

public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            ... ...
        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

        
            if (timestamp > lastWatermark) {
            
                saveRegisterWatermarkTimer();

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    } 

我不明白为什么每次接收新元素时,saveRegisterWatermarkTimer() 都会被调用?这是源代码:

private void saveRegisterWatermarkTimer() {
    long currentWatermark = timerService.currentWatermark();
    // protect against overflow
    if (currentWatermark + 1 > currentWatermark) {
        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
    }
}

它几乎总是注册一个新的事件时间计时器。它不会创建太多计时器吗?

感谢您提供更多解释。

currentWatermark + 1 注册一个计时器有点惯用,只要您想知道每个到达的水印(或者换句话说,每次事件时间时钟提前)都可以使用。 Flink 中的计时器会自动去重:对于任何一对 (key, timestamp) 最多只能有一个计时器,因此不会有创建太多计时器的风险。