Flink CepOperator
Flink CepOperator
我正在阅读 Flink CEP 的 CepOperator
源代码,对以下代码片段有疑问:
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
... ...
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
if (timestamp > lastWatermark) {
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
} else {
numLateRecordsDropped.inc();
}
}
}
我不明白为什么每次接收新元素时,saveRegisterWatermarkTimer()
都会被调用?这是源代码:
private void saveRegisterWatermarkTimer() {
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
}
}
它几乎总是注册一个新的事件时间计时器。它不会创建太多计时器吗?
感谢您提供更多解释。
为 currentWatermark + 1
注册一个计时器有点惯用,只要您想知道每个到达的水印(或者换句话说,每次事件时间时钟提前)都可以使用。 Flink 中的计时器会自动去重:对于任何一对 (key, timestamp) 最多只能有一个计时器,因此不会有创建太多计时器的风险。
我正在阅读 Flink CEP 的 CepOperator
源代码,对以下代码片段有疑问:
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
... ...
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
if (timestamp > lastWatermark) {
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
} else {
numLateRecordsDropped.inc();
}
}
}
我不明白为什么每次接收新元素时,saveRegisterWatermarkTimer()
都会被调用?这是源代码:
private void saveRegisterWatermarkTimer() {
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
}
}
它几乎总是注册一个新的事件时间计时器。它不会创建太多计时器吗?
感谢您提供更多解释。
为 currentWatermark + 1
注册一个计时器有点惯用,只要您想知道每个到达的水印(或者换句话说,每次事件时间时钟提前)都可以使用。 Flink 中的计时器会自动去重:对于任何一对 (key, timestamp) 最多只能有一个计时器,因此不会有创建太多计时器的风险。