Apache Beam 使用 Scio 保存到 BigQuery 并明确指定 TriggeringFrequency

Apache Beam Saving to BigQuery using Scio and explicitly specifying TriggeringFrequency

我正在使用 Spotify Scio 创建一个由 Pub/Sub 消息触发的 Scala 数据流管道。它从我们的私人 DB 读取信息,然后将信息插入 BigQuery.

问题是:

所以到目前为止我有以下管道:

sc
  .customInput("Job Trigger", inputIO)
  .map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
  .flatten
  .withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
  .groupBy(_.ssoId)
  .map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)

当我使用 scio api (saveAsBigQuery).

时,我似乎找不到指定触发频率的方法

它只存在于本机 beam api:

BigQueryIO
  .write()
  .withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
  .to(bqTableName)
  .withSchema(getSchema)
  .withCreateDisposition(CREATE_NEVER)
  .withWriteDisposition(WRITE_TRUNCATE)

如果我使用 BigQueryIO,我将不得不使用 sc.pipeline.apply 而不是我当前的管道。

有没有办法以某种方式将 BigQueryIO 集成到我当前的管道或以某种方式在当前管道上指定 withTriggeringFrequency

Scio 目前不支持指定用于将数据加载到 Big Query 的方法。由于这是不可能的,因此自动 STREAMING_INSERTS 用于无界集合,这显然不能支持截断。因此,您需要回退到指定触发频率 (withTriggeringFrequency(...)) 和方法 (withMethod(Method.FILE_LOADS)) 的 Beam BigQueryIO

要将其集成到您的 Scio 管道中,您只需使用 saveAsCustomOutput。 也可以在此处找到示例:https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library