如何在 Flink 流处理窗口中收集延迟数据

How to gather late data in Flink Stream Processing Windowing

假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的 window 时间内收集输入数据流并减少每个 window 数据。我使用以下代码执行此操作:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

Point:数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的最后8个约数,例如 1531569851297 将映射到 1531569851296.

但也有可能数据流迟到,进入错误的window时间。例如,假设我将 window 时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎,或者至少延迟小于 window 时间(8 毫秒),这将是最好的情况。但是假设数据流事件时间(也是数据流中的一个字段)已经到达,延迟为 30 毫秒。所以它会输入错误的 window 我想如果我检查每个数据流的事件时间,因为它想输入 window,我可以过滤这么晚的数据。 所以我有两个问题:

Flink 有两个不同的相关抽象,它们处理计算的不同方面 windowed 对具有事件时间戳的流进行分析:watermarks允许迟到.

首先,水印,它在处理事件时间数据时发挥作用(无论您是否使用 windows)。水印向 Flink 提供有关事件时间进度的信息,并为应用程序编写者提供一种处理乱序数据的方法。水印随数据流流动,每个水印都标记了流中的一个位置并带有时间戳。水印用作断言,在流中的那个点,流现在(可能)完成到该时间戳 - 或者换句话说,跟随水印的事件不太可能在由水印指示的时间之前发生水印。最常见的水印策略是使用 BoundedOutOfOrdernessTimestampExtractor,它假定事件在某个固定的、有界的延迟内到达。

这现在提供了迟到的定义——时间戳小于水印时间戳的水印事件被认为是迟到

window API 提供了 允许迟到 的概念,默认设置为零。如果允许的迟到大于零,那么事件时间的默认触发器 windows 将接受迟到的事件到它们适当的 windows,直到允许的迟到的限制。 window 动作将在通常时间触发一次,然后为每个迟到事件触发一次,直到允许的迟到间隔结束。之后,延迟事件将被丢弃(如果配置了一个,则收集到侧面输出)。

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?

Flink 的 window 分配器负责将事件分配给适当的 windows —— 正确的事情会自动发生。将根据需要创建新的 window 个实例。

How can I gather such late data in a variable to do some processing on them?

您可以在水印中足够慷慨以避免有任何迟到的数据,and/or将允许的迟到配置得足够长以适应迟到的事件。但是请注意,Flink 将强制保持所有 windows 处于打开状态,这些 windows 仍在接受延迟事件,这将延迟垃圾收集旧 windows 并可能消耗大量内存。

请注意,此讨论假设您想要处理时间 windows——例如您正在使用的 8 毫秒长 windows。 Flink 还支持计数 windows(例如将事件分组为 100 个)、会话 windows 和自定义 window 逻辑。例如,如果您使用计数 windows,水印和迟到就不会起作用。

如果您想要针对每个键的分析结果,请在应用 windowing 之前使用 keyBy 按键(例如,按 userId)对流进行分区。例如

stream
  .keyBy(e -> e.userId)
  .timeWindow(Time.seconds(10))
  .reduce(...)

将为每个用户 ID 生成单独的结果。

更新:请注意,在最新版本的 Flink 中,windows 现在可以将延迟事件收集到侧面输出。

一些相关文档:

Event Time and Watermarks
Allowed Lateness