Flink Windows 边界、水印、事件时间戳和处理时间
Flink Windows Boundaries, Watermark, Event Timestamp & Processing Time
问题定义和概念建立
假设我们有一个 TumblingEventTimeWindow,大小为 5 分钟。我们的活动包含 2 条基本信息:
- 个数
- 事件时间戳
在这个例子中,我们在工作机器的挂钟时间 12:00 PM 启动我们的 Flink 拓扑(当然是工作机器可以有不同步的时钟,但这超出了这个问题的范围)。此拓扑包含一个处理运算符,其职责是总结属于每个window的事件值和一个KAFKA Sink与这个问题无关。
- 此 window 具有 BoundedOutOfOrdernessTimestampExtractor 允许的延迟 一分钟 。
- Watermark:据我了解,Flink 和 Spark Structured Stream 中的水印定义为 (max-event-timestamp-seen-so-far - allowed -迟到)。任何事件时间戳小于或等于此水印的事件都将被丢弃并在结果计算中被忽略。
第 1 部分(确定 Window 的边界)
快乐(实时)路径
在这种情况下,多个事件到达 Flink Operator,具有跨越 12:01 - 12:09
的不同事件时间戳。此外,事件时间戳与我们的 处理时间 相对一致(如下面的 X 轴所示)。由于我们处理的是 EVENT_TIME 特征,偶数是否属于特定事件应该通过其 事件时间戳 来确定.
enter image description here
旧数据涌入
在那个流程中,我假设两个翻滚windows[=229]的边界 =] 是 12:00 -- 12:05
和 12:05 -- 12:10
只是因为我们在 12:00 开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么情况,其中几个 old 事件与 much older[=229] 一起出现=] 事件时间戳,我们又在 12:00 开始了拓扑? (年龄大到我们的迟到津贴不包括他们)。类似于以下内容:
enter image description here
- 如果是那样的话,那么我们的事件将不会被捕获到 any window 当然,所以我再次希望这不是行为:)
- 另一种选择是通过到达事件的事件时间戳来确定windows'边界。如果是这样,那将如何工作?注意到的 最小事件时间戳 成为第一个 window 的开始,然后根据大小从那里开始(在本例中 5 分钟 ) ,由此产生的边界是确定的?因为这种做法也会有缺陷和漏洞。您能否解释一下这是如何工作的以及如何确定 windows 的边界?
回填事件涌入
上一个问题的答案也将解决这个问题,但我认为在这里明确提及它会有所帮助。假设我有 TumblingEventTimeWindow 大小 5 分钟 。然后在 12:00 我开始了一个回填工作,它在许多事件中涌入了时间戳覆盖范围 10:02 - 10:59
的 Flink 算子;但由于这是回填作业,整个执行过程 大约需要 3 分钟 完成。
作业是否会分配 12 个单独的 windows 并根据事件的事件时间戳正确填充它们?那些 12 windows 的界限是什么?我最终会得到 12 个输出事件,每个输出事件 总计 每个已分配 window 的值吗?
第 2 部分(Unit/Integration 此类有状态运算符的测试)
我对此类逻辑和运算符的自动化测试也有一些担忧。操纵 处理时间 的最佳方式,以形成所需 windows 边界以用于测试目的的方式触发某些行为。特别是因为到目前为止我读过的关于利用 Test Harnesses
的内容似乎有点令人困惑,并且可能导致一些混乱的代码,这不是那么容易阅读:
参考文献
我在这方面学到的大部分知识和我的一些困惑的根源可以在以下地方找到:
- Timestmap Extractors & Watermark Emitters
- Event Time Processing & Watermarking
- Handling Late Data & Watermarking in Spark
- Spark 文档那部分中的 images 非常有帮助和教育意义。但与此同时,windows' 边界与那些 处理时间 和 not 事件时间戳对齐的方式给我造成了一些困惑.
- 此外,在该可视化中,watermark 似乎每 5 分钟 计算一次,因为这是 5 分钟 的滑动规范=368=]。这是应该多久计算一次 watermark 的决定因素吗?对于不同的 windows(例如
Tumbling
、Sliding
、Session
等),这在 Flink 中如何工作?!
非常感谢您的帮助,如果您知道关于这些概念及其内部工作的任何更好的参考,请告诉我。
@snntrable 回答后更新
If you run a Job with event time semantics, the processing time at the window operators is completely irrelevant
说得对,我理解那部分。一旦你处理了 EVENT_TIME
特征,你就几乎脱离了 semantics/logic 中的 处理时间 。我提出 处理时间 的原因是我对以下关键问题感到困惑,这对我来说仍然是个谜:
windows'边界是如何计算的?!
此外,非常感谢您澄清 out-of-orderness
和 lateness
之间的区别。我正在处理的代码用词不当(从 BoundedOutOfOrdernessTimestampExtractor
继承的 class 的构造函数参数被命名为 maxLatency
):/
考虑到这一点,让我看看关于如何计算水印和何时事件是否正确将被丢弃(或侧面输出):
- 无序分配器
- 当前水印=
max-event-time-seen-so-far - max-out-of-orderness-allowed
- 允许迟到
- 当前水印=
max-event-time-seen-so-far - allowed-lateness
- 正常流量
- 当前水印=
max-event-time-seen-so-far
并且在任何这些情况中,无论事件时间戳 小于或等于[=229]的事件=]到current-watermark
,会被丢弃(侧输出),对吗?!
这带来了一个新问题。您什么时候想使用 out of orderness
而不是 lateness
?由于 current watermark 计算(数学上)在这些情况下可以相同。当你使用 both 时会发生什么(这有意义吗)?!
回到Windows'边界
这对我来说仍然是主要的谜团。鉴于以上所有讨论,让我们重新审视我提供的具体示例,看看 windows' 边界 是如何确定的。假设我们有以下场景(事件的形状为 (value, timestamp)
):
- 接线员在 12:00 PM 开始(即 处理时间)
- 事件按以下顺序到达操作员
- (1, 8:29)
- (5, 8:26)
- (3, 9:48)
- (7, 9:46)
- 我们有一个 TumblingEventTimeWindow,大小为 5 分钟
- window 应用到
DataStream
和 BoundedOutOfOrdernessTimestampExtractor
有 2 分钟 maxOutOfOrderness
- 还有,window配置了
allowedLateness
的1分钟
注意:如果您不能同时拥有 out of orderness
和 lateness
或者 没有 有意义,请仅 考虑上面示例中的 out of orderness
。
最后,请您布置windows,其中将分配一些事件给它们和,请指定边界 windows(beginning and end window 的时间戳).我假设边界也由 事件的时间戳决定,但在像这样的具体示例中弄清楚它们有点棘手。
再次非常感谢您的帮助:)
原答案
Watermark: To my understanding, watermark in Flink and Spark Structured Stream is defined as (max-event-timestamp-seen-so-far - allowed-lateness). Any event whose event timestamp is less than or equal to this watermark will be discarded and ignored in result computations.
这是不正确的,可能是造成混淆的原因。 Out-of-Orderness 和 Lateness 在 Flink 中是不同的概念。对于 BoundedOutOfOrdernessTimestampExtractor
,水印是 max-event-timestamp-seen-so-far - max-out-of-orderness
。 Flink 文档 [1].
中有关允许延迟的更多信息
如果您 运行 具有事件时间语义的作业,则 window 运算符的处理时间完全无关紧要:
- 事件将根据事件时间戳
分配给它们的windows
- 时间 windows 将在水印达到其最大时间戳 (
window end time -1
) 时触发。
- 时间戳早于
current watermark - allowed lateness
的事件将被丢弃或发送到延迟数据端输出 [1]
这意味着,如果您在 12:00pm(处理时间)开始工作并开始从过去摄取数据,水印也将(甚至更远)过去。所以,配置的allowedLateness
是无关紧要的,因为数据没有偶数时间延迟。
另一方面,如果您首先从 12:00pm 中摄取一些数据,然后从 10:00pm 中摄取数据,水印将在您摄取旧数据之前已经前进到 ~12:00pm数据。在这种情况下,来自 10:00pm 的数据将“延迟”。如果它晚于配置的allowedLateness
(默认=0),它被丢弃(默认)或发送到侧面输出(如果配置)[1]。
跟进答案
事件时间 window 的时间表如下:
- a window 中具有时间戳的第一个元素到达 -> 已创建此 window(& 密钥)的状态
watermark >= window_endtime - 1
到达 -> window 被触发(结果被发出),但状态没有被丢弃
watermark >= window_endtime + allowed_latenes
到达 -> 状态被丢弃
此 window 的 2. 和 3. 事件迟到了,但在允许的迟到范围内。事件被添加到现有状态,并且 - 默认情况下 - window 在每条记录上触发,发出经过优化的结果。
在 3. 之后,此 window 的事件将被丢弃(或发送到后期输出接收器)。
所以,是的,配置两者是有意义的。乱序决定了 window 何时首次触发,而允许的迟到决定了状态保持多长时间以潜在地更新结果。
关于边界:翻滚事件时间 windows 具有固定长度,跨键对齐并从 unix 纪元开始。空windows,不存在。对于您的示例,这意味着:
- (1, 8:29) 添加到 window (8:25 - 8:29:59:999)
- (5, 8:26) 添加到 window (8:25 - 8:29:59:999)
- (3, 9:48) 添加到 window (9:45 - 9:49:59:999)
- (8:25 - 8:29:59:999) 被触发,因为水印已经前进到 9:48-0:02=9:46,比上一个大window 的时间戳。 window状态也被丢弃,因为watermark已经提前到9:46,也大于window的结束时间+允许迟到(1分钟)
- (7, 9:46) 添加到 window 添加到 window (9:45 - 9:49:59:999)
希望这对您有所帮助。
康斯坦丁
问题定义和概念建立
假设我们有一个 TumblingEventTimeWindow,大小为 5 分钟。我们的活动包含 2 条基本信息:
- 个数
- 事件时间戳
在这个例子中,我们在工作机器的挂钟时间 12:00 PM 启动我们的 Flink 拓扑(当然是工作机器可以有不同步的时钟,但这超出了这个问题的范围)。此拓扑包含一个处理运算符,其职责是总结属于每个window的事件值和一个KAFKA Sink与这个问题无关。
- 此 window 具有 BoundedOutOfOrdernessTimestampExtractor 允许的延迟 一分钟 。
- Watermark:据我了解,Flink 和 Spark Structured Stream 中的水印定义为 (max-event-timestamp-seen-so-far - allowed -迟到)。任何事件时间戳小于或等于此水印的事件都将被丢弃并在结果计算中被忽略。
第 1 部分(确定 Window 的边界)
快乐(实时)路径
在这种情况下,多个事件到达 Flink Operator,具有跨越 12:01 - 12:09
的不同事件时间戳。此外,事件时间戳与我们的 处理时间 相对一致(如下面的 X 轴所示)。由于我们处理的是 EVENT_TIME 特征,偶数是否属于特定事件应该通过其 事件时间戳 来确定.
enter image description here
旧数据涌入
在那个流程中,我假设两个翻滚windows[=229]的边界 =] 是 12:00 -- 12:05
和 12:05 -- 12:10
只是因为我们在 12:00 开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么情况,其中几个 old 事件与 much older[=229] 一起出现=] 事件时间戳,我们又在 12:00 开始了拓扑? (年龄大到我们的迟到津贴不包括他们)。类似于以下内容:
enter image description here
- 如果是那样的话,那么我们的事件将不会被捕获到 any window 当然,所以我再次希望这不是行为:)
- 另一种选择是通过到达事件的事件时间戳来确定windows'边界。如果是这样,那将如何工作?注意到的 最小事件时间戳 成为第一个 window 的开始,然后根据大小从那里开始(在本例中 5 分钟 ) ,由此产生的边界是确定的?因为这种做法也会有缺陷和漏洞。您能否解释一下这是如何工作的以及如何确定 windows 的边界?
回填事件涌入
上一个问题的答案也将解决这个问题,但我认为在这里明确提及它会有所帮助。假设我有 TumblingEventTimeWindow 大小 5 分钟 。然后在 12:00 我开始了一个回填工作,它在许多事件中涌入了时间戳覆盖范围 10:02 - 10:59
的 Flink 算子;但由于这是回填作业,整个执行过程 大约需要 3 分钟 完成。
作业是否会分配 12 个单独的 windows 并根据事件的事件时间戳正确填充它们?那些 12 windows 的界限是什么?我最终会得到 12 个输出事件,每个输出事件 总计 每个已分配 window 的值吗?
第 2 部分(Unit/Integration 此类有状态运算符的测试)
我对此类逻辑和运算符的自动化测试也有一些担忧。操纵 处理时间 的最佳方式,以形成所需 windows 边界以用于测试目的的方式触发某些行为。特别是因为到目前为止我读过的关于利用 Test Harnesses
的内容似乎有点令人困惑,并且可能导致一些混乱的代码,这不是那么容易阅读:
参考文献
我在这方面学到的大部分知识和我的一些困惑的根源可以在以下地方找到:
- Timestmap Extractors & Watermark Emitters
- Event Time Processing & Watermarking
- Handling Late Data & Watermarking in Spark
- Spark 文档那部分中的 images 非常有帮助和教育意义。但与此同时,windows' 边界与那些 处理时间 和 not 事件时间戳对齐的方式给我造成了一些困惑.
- 此外,在该可视化中,watermark 似乎每 5 分钟 计算一次,因为这是 5 分钟 的滑动规范=368=]。这是应该多久计算一次 watermark 的决定因素吗?对于不同的 windows(例如
Tumbling
、Sliding
、Session
等),这在 Flink 中如何工作?!
非常感谢您的帮助,如果您知道关于这些概念及其内部工作的任何更好的参考,请告诉我。
@snntrable 回答后更新
If you run a Job with event time semantics, the processing time at the window operators is completely irrelevant
说得对,我理解那部分。一旦你处理了 EVENT_TIME
特征,你就几乎脱离了 semantics/logic 中的 处理时间 。我提出 处理时间 的原因是我对以下关键问题感到困惑,这对我来说仍然是个谜:
windows'边界是如何计算的?!
此外,非常感谢您澄清 out-of-orderness
和 lateness
之间的区别。我正在处理的代码用词不当(从 BoundedOutOfOrdernessTimestampExtractor
继承的 class 的构造函数参数被命名为 maxLatency
):/
考虑到这一点,让我看看关于如何计算水印和何时事件是否正确将被丢弃(或侧面输出):
- 无序分配器
- 当前水印=
max-event-time-seen-so-far - max-out-of-orderness-allowed
- 当前水印=
- 允许迟到
- 当前水印=
max-event-time-seen-so-far - allowed-lateness
- 当前水印=
- 正常流量
- 当前水印=
max-event-time-seen-so-far
- 当前水印=
并且在任何这些情况中,无论事件时间戳 小于或等于[=229]的事件=]到current-watermark
,会被丢弃(侧输出),对吗?!
这带来了一个新问题。您什么时候想使用 out of orderness
而不是 lateness
?由于 current watermark 计算(数学上)在这些情况下可以相同。当你使用 both 时会发生什么(这有意义吗)?!
回到Windows'边界
这对我来说仍然是主要的谜团。鉴于以上所有讨论,让我们重新审视我提供的具体示例,看看 windows' 边界 是如何确定的。假设我们有以下场景(事件的形状为 (value, timestamp)
):
- 接线员在 12:00 PM 开始(即 处理时间)
- 事件按以下顺序到达操作员
- (1, 8:29)
- (5, 8:26)
- (3, 9:48)
- (7, 9:46)
- 我们有一个 TumblingEventTimeWindow,大小为 5 分钟
- window 应用到
DataStream
和BoundedOutOfOrdernessTimestampExtractor
有 2 分钟maxOutOfOrderness
- window 应用到
- 还有,window配置了
allowedLateness
的1分钟
注意:如果您不能同时拥有 out of orderness
和 lateness
或者 没有 有意义,请仅 考虑上面示例中的 out of orderness
。
最后,请您布置windows,其中将分配一些事件给它们和,请指定边界 windows(beginning and end window 的时间戳).我假设边界也由 事件的时间戳决定,但在像这样的具体示例中弄清楚它们有点棘手。
再次非常感谢您的帮助:)
原答案
Watermark: To my understanding, watermark in Flink and Spark Structured Stream is defined as (max-event-timestamp-seen-so-far - allowed-lateness). Any event whose event timestamp is less than or equal to this watermark will be discarded and ignored in result computations.
这是不正确的,可能是造成混淆的原因。 Out-of-Orderness 和 Lateness 在 Flink 中是不同的概念。对于 BoundedOutOfOrdernessTimestampExtractor
,水印是 max-event-timestamp-seen-so-far - max-out-of-orderness
。 Flink 文档 [1].
如果您 运行 具有事件时间语义的作业,则 window 运算符的处理时间完全无关紧要:
- 事件将根据事件时间戳 分配给它们的windows
- 时间 windows 将在水印达到其最大时间戳 (
window end time -1
) 时触发。 - 时间戳早于
current watermark - allowed lateness
的事件将被丢弃或发送到延迟数据端输出 [1]
这意味着,如果您在 12:00pm(处理时间)开始工作并开始从过去摄取数据,水印也将(甚至更远)过去。所以,配置的allowedLateness
是无关紧要的,因为数据没有偶数时间延迟。
另一方面,如果您首先从 12:00pm 中摄取一些数据,然后从 10:00pm 中摄取数据,水印将在您摄取旧数据之前已经前进到 ~12:00pm数据。在这种情况下,来自 10:00pm 的数据将“延迟”。如果它晚于配置的allowedLateness
(默认=0),它被丢弃(默认)或发送到侧面输出(如果配置)[1]。
跟进答案
事件时间 window 的时间表如下:
- a window 中具有时间戳的第一个元素到达 -> 已创建此 window(& 密钥)的状态
watermark >= window_endtime - 1
到达 -> window 被触发(结果被发出),但状态没有被丢弃watermark >= window_endtime + allowed_latenes
到达 -> 状态被丢弃
此 window 的 2. 和 3. 事件迟到了,但在允许的迟到范围内。事件被添加到现有状态,并且 - 默认情况下 - window 在每条记录上触发,发出经过优化的结果。
在 3. 之后,此 window 的事件将被丢弃(或发送到后期输出接收器)。
所以,是的,配置两者是有意义的。乱序决定了 window 何时首次触发,而允许的迟到决定了状态保持多长时间以潜在地更新结果。
关于边界:翻滚事件时间 windows 具有固定长度,跨键对齐并从 unix 纪元开始。空windows,不存在。对于您的示例,这意味着:
- (1, 8:29) 添加到 window (8:25 - 8:29:59:999)
- (5, 8:26) 添加到 window (8:25 - 8:29:59:999)
- (3, 9:48) 添加到 window (9:45 - 9:49:59:999)
- (8:25 - 8:29:59:999) 被触发,因为水印已经前进到 9:48-0:02=9:46,比上一个大window 的时间戳。 window状态也被丢弃,因为watermark已经提前到9:46,也大于window的结束时间+允许迟到(1分钟)
- (7, 9:46) 添加到 window 添加到 window (9:45 - 9:49:59:999)
希望这对您有所帮助。
康斯坦丁