Apache Beam:为什么 Global Window 中聚合值的时间戳是 9223371950454775?
Apache Beam: why is the timestamp of aggregate value in Global Window 9223371950454775?
我们从 Google Dataflow 1.9 迁移到 Apache Beam 0.6。我们注意到应用全局窗口后时间戳的行为发生了变化。在 Google Dataflow 1.9 中,我们将在 windowing/combine 函数之后在 DoFn 中获得正确的时间戳。现在我们为时间戳获得了一些巨大的价值,例如9223371950454775,Apache Beam 版本中全局窗口的默认行为是否发生变化?
input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
.apply(name(id, "Window"), Window
.<KV<Long, ObjectNode >>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes())
.apply(name(id, "Group By Shard"), GroupByKey.create())
.appy(.....) }
TL;DR:当你合并一堆带时间戳的值时,你需要为聚合结果选择一个时间戳。这个输出时间戳有多个好的答案。在 Dataflow 1.x 中,默认值是输入时间戳中的最小值。根据我们在 Beam 中使用 1.x 的经验,默认值已更改为 window 的末尾。您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST)
添加到 Window
转换来恢复之前的行为。
我会打开包装。让我们使用 @ 符号来配对一个值及其时间戳。只关注一个键,你有时间戳值 v1@t1, v2@t2, ..., 等等。我会坚持你的例子一个原始的 GroupByKey
即使这也适用于组合值的其他方式。所以值的输出可迭代是 [v1, v2, ...] 任意顺序。
以下是时间戳的一些可能性:
- min(t1, t2, ...)
- max(t1, t2, ...)
- 这些元素在window的结尾(忽略输入时间戳)
这些都是正确的。这些都可以作为您的 OutputTimeFn
在 Dataflow 1.x 和 TimestampCombiner
在 Apache Beam 中的选项。
时间戳有不同的解释,它们对不同的事情有用。聚合值的输出时间控制下游水印。因此,选择较早的时间戳可以更多地保留下游水印,而较晚的时间戳可以使其向前移动。
- min(t1, t2, ...) 允许您解压可迭代对象并重新输出 v1@t1
- max(t1, t2, ...) 准确地模拟聚合值完全可用的逻辑时间。 Max 确实往往是最昂贵的,原因与实施细节有关。
- window 结束:
- 模拟此聚合表示 所有 window
数据的事实
- 非常容易理解
- 允许下游水印尽快推进
- 效率极高
出于所有这些原因,我们将默认值从 min 切换到 window 的末尾。
在 Beam 中,您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST)
添加到 Window
转换来恢复之前的行为。在 Dataflow 1.x 中,您可以通过添加 .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())
.
迁移到 Beam 的默认值
另一个技术问题是用户定义的OutputTimeFn
被移除并被TimestampCombiner
枚举取代,所以只有这三个选择,而不是整个API来写你的拥有。
我们从 Google Dataflow 1.9 迁移到 Apache Beam 0.6。我们注意到应用全局窗口后时间戳的行为发生了变化。在 Google Dataflow 1.9 中,我们将在 windowing/combine 函数之后在 DoFn 中获得正确的时间戳。现在我们为时间戳获得了一些巨大的价值,例如9223371950454775,Apache Beam 版本中全局窗口的默认行为是否发生变化?
input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
.apply(name(id, "Window"), Window
.<KV<Long, ObjectNode >>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes())
.apply(name(id, "Group By Shard"), GroupByKey.create())
.appy(.....) }
TL;DR:当你合并一堆带时间戳的值时,你需要为聚合结果选择一个时间戳。这个输出时间戳有多个好的答案。在 Dataflow 1.x 中,默认值是输入时间戳中的最小值。根据我们在 Beam 中使用 1.x 的经验,默认值已更改为 window 的末尾。您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST)
添加到 Window
转换来恢复之前的行为。
我会打开包装。让我们使用 @ 符号来配对一个值及其时间戳。只关注一个键,你有时间戳值 v1@t1, v2@t2, ..., 等等。我会坚持你的例子一个原始的 GroupByKey
即使这也适用于组合值的其他方式。所以值的输出可迭代是 [v1, v2, ...] 任意顺序。
以下是时间戳的一些可能性:
- min(t1, t2, ...)
- max(t1, t2, ...)
- 这些元素在window的结尾(忽略输入时间戳)
这些都是正确的。这些都可以作为您的 OutputTimeFn
在 Dataflow 1.x 和 TimestampCombiner
在 Apache Beam 中的选项。
时间戳有不同的解释,它们对不同的事情有用。聚合值的输出时间控制下游水印。因此,选择较早的时间戳可以更多地保留下游水印,而较晚的时间戳可以使其向前移动。
- min(t1, t2, ...) 允许您解压可迭代对象并重新输出 v1@t1
- max(t1, t2, ...) 准确地模拟聚合值完全可用的逻辑时间。 Max 确实往往是最昂贵的,原因与实施细节有关。
- window 结束:
- 模拟此聚合表示 所有 window 数据的事实
- 非常容易理解
- 允许下游水印尽快推进
- 效率极高
出于所有这些原因,我们将默认值从 min 切换到 window 的末尾。
在 Beam 中,您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST)
添加到 Window
转换来恢复之前的行为。在 Dataflow 1.x 中,您可以通过添加 .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())
.
另一个技术问题是用户定义的OutputTimeFn
被移除并被TimestampCombiner
枚举取代,所以只有这三个选择,而不是整个API来写你的拥有。