动态定义 apache beam 的大小 windows

Dynamically define the size of the apache beam windows

我正在从 PubSub 读取事件,目标是将它们分组到 windows。我想让每个 window 的结尾与每小时的第 0、15、30 和 45 分钟重合。
由于这是一个流作业,它可以随时启动,我想找到一种方法使第一个 window 的大小与接下来的
对齐。 这将是流:

  1. 启动作业
  2. 定义为window_size此时距下一刻钟剩余的时间
  3. 从第一个window的末尾开始,设置window_size = int(15*60)(秒)。

例如:

  1. 启动作业
  2. 现在是 11:18,所以修正 window_size = (11:30-11:18).seconds
  3. 当第一个 window 结束时,设置 window_size = int(15*60)(秒)

在 Google 提供的示例之一中,使用 windowing 的管道定义如下,其中 window_size 是作为用户输入传递的参数:

def expand(self, pcoll):
  return (
          pcoll
          | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
          | "Add Key" >> beam.Map(lambda elem: (None, elem))
          | "Groupby" >> beam.GroupByKey()
          | "Abandon Key" >> beam.MapTuple(lambda _, val: val)
  )

您的用例非常适合 Beam!

首先,有一个基本的概念问题需要澄清:

  • 元素上用于 windowing 的时间戳称为“事件时间”。它们是数据的一部分,描述了流中的某个事件何时发生。
  • 启动和运行作业的时间称为“处理时间”。它不是您数据的一部分。

如果你不把这两者结合或混淆,你会更成功。 Windows 不要将“开始”或“结束”作为作业处理时间的一部分。 Windows 永远“存在”。

使用 FixedWindows 15 分钟即可达到您想要的效果。每个事件都将与其所属的 15 分钟间隔相关联。当您启动您的工作或当一个事件到达处理时不会影响这一点。

更新:添加示例来说明:

假设您如问题中那样在 11:18 启动作业,并假设传入事件大约在同一时间生成。假设出现以下事件,并显示时间戳:

  • A@11:01
  • B@11:18
  • C@11:15
  • D@11:31
  • E@11:29

元素将分配给windows如下:

  • A 在 [11:00,11:15)
  • B 在 [11:15, 11:30)
  • C 在 [11:15, 11:30)
  • D 在 [11:30, 11:45)
  • E 在 [11:15, 11:30)

请注意,window 分配与您开始工作的时间、事件到达的时间或到达的顺序无关。您实际上可以在明天启动它,或者 re-run 对归档数据或甚至不接近顺序的数据启动它,结果将是相同的。事件时间 windowing 是根据数据 .