在 Apache Beam 中维护全局状态
Maintaining a global state within Apache Beam
我们有一个 PubSub 主题,事件沉入 BigQuery(尽管特定的数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。
所以,基本上我有两个问题:
- 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中有一组遇到的属性)?
- 一旦遇到新的 属性 并直到执行
ALTER TABLE
,buffering/holding 事件流的好的策略是什么
现在我尝试使用以下(我正在使用 Spotify scio):
rows
.withFixedWindows(Duration.millis(duration))
.withWindow[IntervalWindow]
.swap
.groupByKey
.map { case (window, rowsIterable) =>
val newRows = findNewProperties(rowsIterable)
mutateTableWith(newRows)
rowsIterable
}
.flatMap(id)
.saveAsBigQuery()
但这非常低效,因为我们至少需要将整个 rowsIterable
加载到内存中,甚至遍历它。
我们正在构建完全相同的项目,并且我们在 this approach 之后使用包含架构的刷新侧输入(从 BQ 每隔一段时间刷新一次)。所以基本上:
- 在侧面输入从 BQ 加载模式
- 使用流模式将数据流式传输到 BQ,这样您就可以对插入失败的行执行其他操作(即:当它们有新的未知数据时 属性)
- 将那些失败的保存到其他地方(数据存储?)以便稍后处理它们(例如,在另一个作业中)
- 该恢复作业将发出模式更改,最终将由主管道刷新端输入(步骤 1)加载。
我有一个使用刷新侧输入方法的工作示例here
我们有一个 PubSub 主题,事件沉入 BigQuery(尽管特定的数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。
所以,基本上我有两个问题:
- 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中有一组遇到的属性)?
- 一旦遇到新的 属性 并直到执行
ALTER TABLE
,buffering/holding 事件流的好的策略是什么
现在我尝试使用以下(我正在使用 Spotify scio):
rows
.withFixedWindows(Duration.millis(duration))
.withWindow[IntervalWindow]
.swap
.groupByKey
.map { case (window, rowsIterable) =>
val newRows = findNewProperties(rowsIterable)
mutateTableWith(newRows)
rowsIterable
}
.flatMap(id)
.saveAsBigQuery()
但这非常低效,因为我们至少需要将整个 rowsIterable
加载到内存中,甚至遍历它。
我们正在构建完全相同的项目,并且我们在 this approach 之后使用包含架构的刷新侧输入(从 BQ 每隔一段时间刷新一次)。所以基本上:
- 在侧面输入从 BQ 加载模式
- 使用流模式将数据流式传输到 BQ,这样您就可以对插入失败的行执行其他操作(即:当它们有新的未知数据时 属性)
- 将那些失败的保存到其他地方(数据存储?)以便稍后处理它们(例如,在另一个作业中)
- 该恢复作业将发出模式更改,最终将由主管道刷新端输入(步骤 1)加载。
我有一个使用刷新侧输入方法的工作示例here