Apache Flink:如何将自定义逻辑应用于延迟事件?

Apache Flink: How to apply custom logic to late events?

尽管 Flink 有一些内置工具来处理延迟数据,比如 allowed lateness,但我还是想自己处理延迟数据。例如,我想监控延迟事件或将它们保存到数据库中。

我该怎么做?

通常在 window 运算符中使用延迟和水印。如果您使用的是 window 运算符,则可以像这样使用 sideoutput:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

然后像这样从 sideoutput 中获取迟到的元素:

windowStream.getSideOutput(lateOutputTag).print()

ProcessFunctions(ProcessFunctionKeyedProcessFunction 等)通过 Context 对象提供对记录的事件时间戳和 TimerService 的访问。 TimerService 可以访问当前水印。

您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。

如何处理延迟事件取决于您。您可以标记它们,您可以丢弃它们,通过侧面输出发出它们,或者用它们执行任何类型的计算。