在 Apache Beam 中维护全局状态

Maintaining a global state within Apache Beam

我们有一个 PubSub 主题,事件沉入 BigQuery(尽管特定的数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。

所以,基本上我有两个问题:

  1. 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中有一组遇到的属性)?
  2. 一旦遇到新的 属性 并直到执行 ALTER TABLE
  3. ,buffering/holding 事件流的好的策略是什么

现在我尝试使用以下(我正在使用 Spotify scio):

rows
  .withFixedWindows(Duration.millis(duration))
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map { case (window, rowsIterable) =>
    val newRows = findNewProperties(rowsIterable)
    mutateTableWith(newRows)
    rowsIterable
  }
  .flatMap(id)
  .saveAsBigQuery()

但这非常低效,因为我们至少需要将整个 rowsIterable 加载到内存中,甚至遍历它。

我们正在构建完全相同的项目,并且我们在 this approach 之后使用包含架构的刷新侧输入(从 BQ 每隔一段时间刷新一次)。所以基本上:

  1. 在侧面输入从 BQ 加载模式
  2. 使用流模式将数据流式传输到 BQ,这样您就可以对插入失败的行执行其他操作(即:当它们有新的未知数据时 属性)
  3. 将那些失败的保存到其他地方(数据存储?)以便稍后处理它们(例如,在另一个作业中)
  4. 该恢复作业将发出模​​式更改,最终将由主管道刷新端输入(步骤 1)加载。

我有一个使用刷新侧输入方法的工作示例here