动态定义 apache beam 的大小 windows
Dynamically define the size of the apache beam windows
我正在从 PubSub 读取事件,目标是将它们分组到 windows。我想让每个 window 的结尾与每小时的第 0、15、30 和 45 分钟重合。
由于这是一个流作业,它可以随时启动,我想找到一种方法使第一个 window 的大小与接下来的
对齐。
这将是流:
- 启动作业
- 定义为
window_size
此时距下一刻钟剩余的时间
- 从第一个window的末尾开始,设置
window_size = int(15*60)
(秒)。
例如:
- 启动作业
- 现在是 11:18,所以修正
window_size = (11:30-11:18).seconds
- 当第一个 window 结束时,设置
window_size = int(15*60)
(秒)
在 Google 提供的示例之一中,使用 windowing 的管道定义如下,其中 window_size
是作为用户输入传递的参数:
def expand(self, pcoll):
return (
pcoll
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
| "Add Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Key" >> beam.MapTuple(lambda _, val: val)
)
您的用例非常适合 Beam!
首先,有一个基本的概念问题需要澄清:
- 元素上用于 windowing 的时间戳称为“事件时间”。它们是数据的一部分,描述了流中的某个事件何时发生。
- 启动和运行作业的时间称为“处理时间”。它不是您数据的一部分。
如果你不把这两者结合或混淆,你会更成功。 Windows 不要将“开始”或“结束”作为作业处理时间的一部分。 Windows 永远“存在”。
使用 FixedWindows
15 分钟即可达到您想要的效果。每个事件都将与其所属的 15 分钟间隔相关联。当您启动您的工作或当一个事件到达处理时不会影响这一点。
更新:添加示例来说明:
假设您如问题中那样在 11:18 启动作业,并假设传入事件大约在同一时间生成。假设出现以下事件,并显示时间戳:
- A@11:01
- B@11:18
- C@11:15
- D@11:31
- E@11:29
元素将分配给windows如下:
- A 在 [11:00,11:15)
- B 在 [11:15, 11:30)
- C 在 [11:15, 11:30)
- D 在 [11:30, 11:45)
- E 在 [11:15, 11:30)
请注意,window 分配与您开始工作的时间、事件到达的时间或到达的顺序无关。您实际上可以在明天启动它,或者 re-run 对归档数据或甚至不接近顺序的数据启动它,结果将是相同的。事件时间 windowing 是根据数据 .
我正在从 PubSub 读取事件,目标是将它们分组到 windows。我想让每个 window 的结尾与每小时的第 0、15、30 和 45 分钟重合。
由于这是一个流作业,它可以随时启动,我想找到一种方法使第一个 window 的大小与接下来的
对齐。
这将是流:
- 启动作业
- 定义为
window_size
此时距下一刻钟剩余的时间 - 从第一个window的末尾开始,设置
window_size = int(15*60)
(秒)。
例如:
- 启动作业
- 现在是 11:18,所以修正
window_size = (11:30-11:18).seconds
- 当第一个 window 结束时,设置
window_size = int(15*60)
(秒)
在 Google 提供的示例之一中,使用 windowing 的管道定义如下,其中 window_size
是作为用户输入传递的参数:
def expand(self, pcoll):
return (
pcoll
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
| "Add Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Key" >> beam.MapTuple(lambda _, val: val)
)
您的用例非常适合 Beam!
首先,有一个基本的概念问题需要澄清:
- 元素上用于 windowing 的时间戳称为“事件时间”。它们是数据的一部分,描述了流中的某个事件何时发生。
- 启动和运行作业的时间称为“处理时间”。它不是您数据的一部分。
如果你不把这两者结合或混淆,你会更成功。 Windows 不要将“开始”或“结束”作为作业处理时间的一部分。 Windows 永远“存在”。
使用 FixedWindows
15 分钟即可达到您想要的效果。每个事件都将与其所属的 15 分钟间隔相关联。当您启动您的工作或当一个事件到达处理时不会影响这一点。
更新:添加示例来说明:
假设您如问题中那样在 11:18 启动作业,并假设传入事件大约在同一时间生成。假设出现以下事件,并显示时间戳:
- A@11:01
- B@11:18
- C@11:15
- D@11:31
- E@11:29
元素将分配给windows如下:
- A 在 [11:00,11:15)
- B 在 [11:15, 11:30)
- C 在 [11:15, 11:30)
- D 在 [11:30, 11:45)
- E 在 [11:15, 11:30)
请注意,window 分配与您开始工作的时间、事件到达的时间或到达的顺序无关。您实际上可以在明天启动它,或者 re-run 对归档数据或甚至不接近顺序的数据启动它,结果将是相同的。事件时间 windowing 是根据数据 .