Apache Flink:如何将自定义逻辑应用于延迟事件?
Apache Flink: How to apply custom logic to late events?
尽管 Flink 有一些内置工具来处理延迟数据,比如 allowed lateness,但我还是想自己处理延迟数据。例如,我想监控延迟事件或将它们保存到数据库中。
我该怎么做?
通常在 window 运算符中使用延迟和水印。如果您使用的是 window 运算符,则可以像这样使用 sideoutput:
val windowStream = eventStream.keyBy(output => output.rule)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.sideOutputLateData(lateOutputTag)
然后像这样从 sideoutput 中获取迟到的元素:
windowStream.getSideOutput(lateOutputTag).print()
ProcessFunctions(ProcessFunction
、KeyedProcessFunction
等)通过 Context
对象提供对记录的事件时间戳和 TimerService
的访问。 TimerService
可以访问当前水印。
您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。
如何处理延迟事件取决于您。您可以标记它们,您可以丢弃它们,通过侧面输出发出它们,或者用它们执行任何类型的计算。
尽管 Flink 有一些内置工具来处理延迟数据,比如 allowed lateness,但我还是想自己处理延迟数据。例如,我想监控延迟事件或将它们保存到数据库中。
我该怎么做?
通常在 window 运算符中使用延迟和水印。如果您使用的是 window 运算符,则可以像这样使用 sideoutput:
val windowStream = eventStream.keyBy(output => output.rule)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.sideOutputLateData(lateOutputTag)
然后像这样从 sideoutput 中获取迟到的元素:
windowStream.getSideOutput(lateOutputTag).print()
ProcessFunctions(ProcessFunction
、KeyedProcessFunction
等)通过 Context
对象提供对记录的事件时间戳和 TimerService
的访问。 TimerService
可以访问当前水印。
您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。
如何处理延迟事件取决于您。您可以标记它们,您可以丢弃它们,通过侧面输出发出它们,或者用它们执行任何类型的计算。