Apache Beam:为什么 Global Window 中聚合值的时间戳是 9223371950454775?

Apache Beam: why is the timestamp of aggregate value in Global Window 9223371950454775?

我们从 Google Dataflow 1.9 迁移到 Apache Beam 0.6。我们注意到应用全局窗口后时间戳的行为发生了变化。在 Google Dataflow 1.9 中,我们将在 windowing/combine 函数之后在 DoFn 中获得正确的时间戳。现在我们为时间戳获得了一些巨大的价值,例如9223371950454775,Apache Beam 版本中全局窗口的默认行为是否发生变化?

input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
      .apply(name(id, "Window"), Window
          .<KV<Long, ObjectNode >>into(new GlobalWindows())
          .triggering(Repeatedly.forever(
              AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1))))
          .discardingFiredPanes())
      .apply(name(id, "Group By Shard"), GroupByKey.create())
      .appy(.....) }

TL;DR:当你合并一堆带时间戳的值时,你需要为聚合结果选择一个时间戳。这个输出时间戳有多个好的答案。在 Dataflow 1.x 中,默认值是输入时间戳中的最小值。根据我们在 Beam 中使用 1.x 的经验,默认值已更改为 window 的末尾。您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST) 添加到 Window 转换来恢复之前的行为。


我会打开包装。让我们使用 @ 符号来配对一个值及其时间戳。只关注一个键,你有时间戳值 v1@t1, v2@t2, ..., 等等。我会坚持你的例子一个原始的 GroupByKey 即使这也适用于组合值的其他方式。所以值的输出可迭代是 [v1, v2, ...] 任意顺序。

以下是时间戳的一些可能性:

  • min(t1, t2, ...)
  • max(t1, t2, ...)
  • 这些元素在window的结尾(忽略输入时间戳)

这些都是正确的。这些都可以作为您的 OutputTimeFn 在 Dataflow 1.x 和 TimestampCombiner 在 Apache Beam 中的选项。

时间戳有不同的解释,它们对不同的事情有用。聚合值的输出时间控制下游水印。因此,选择较早的时间戳可以更多地保留下游水印,而较晚的时间戳可以使其向前移动。

  • min(t1, t2, ...) 允许您解压可迭代对象并重新输出 v1@t1
  • max(t1, t2, ...) 准确地模拟聚合值完全可用的逻辑时间。 Max 确实往往是最昂贵的,原因与实施细节有关。
  • window 结束:
    • 模拟此聚合表示 所有 window
    • 数据的事实
    • 非常容易理解
    • 允许下游水印尽快推进
    • 效率极高

出于所有这些原因,我们将默认值从 min 切换到 window 的末尾。

在 Beam 中,您可以通过将 .withTimestampCombiner(TimestampCombiner.EARLIEST) 添加到 Window 转换来恢复之前的行为。在 Dataflow 1.x 中,您可以通过添加 .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()).

迁移到 Beam 的默认值

另一个技术问题是用户定义的OutputTimeFn被移除并被TimestampCombiner枚举取代,所以只有这三个选择,而不是整个API来写你的拥有。