Apache Beam 使用 Scio 保存到 BigQuery 并明确指定 TriggeringFrequency
Apache Beam Saving to BigQuery using Scio and explicitly specifying TriggeringFrequency
我正在使用 Spotify Scio
创建一个由 Pub/Sub
消息触发的 Scala 数据流管道。它从我们的私人 DB
读取信息,然后将信息插入 BigQuery
.
问题是:
- 我需要删除之前的数据
- 为此,我需要使用写入处置
WRITE_TRUNCATE
- 但是,作业自动注册为流式处理,因此出现以下错误:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- 所以我需要手动将管道更改为
Batch
管道,指定触发频率。
所以到目前为止我有以下管道:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
当我使用 scio
api (saveAsBigQuery
).
时,我似乎找不到指定触发频率的方法
它只存在于本机 beam
api:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
如果我使用 BigQueryIO
,我将不得不使用 sc.pipeline.apply
而不是我当前的管道。
有没有办法以某种方式将 BigQueryIO
集成到我当前的管道或以某种方式在当前管道上指定 withTriggeringFrequency
?
Scio 目前不支持指定用于将数据加载到 Big Query 的方法。由于这是不可能的,因此自动 STREAMING_INSERTS
用于无界集合,这显然不能支持截断。因此,您需要回退到指定触发频率 (withTriggeringFrequency(...)
) 和方法 (withMethod(Method.FILE_LOADS)
) 的 Beam BigQueryIO
。
要将其集成到您的 Scio 管道中,您只需使用 saveAsCustomOutput
。
也可以在此处找到示例:https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library
我正在使用 Spotify Scio
创建一个由 Pub/Sub
消息触发的 Scala 数据流管道。它从我们的私人 DB
读取信息,然后将信息插入 BigQuery
.
问题是:
- 我需要删除之前的数据
- 为此,我需要使用写入处置
WRITE_TRUNCATE
- 但是,作业自动注册为流式处理,因此出现以下错误:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- 所以我需要手动将管道更改为
Batch
管道,指定触发频率。
所以到目前为止我有以下管道:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
当我使用 scio
api (saveAsBigQuery
).
它只存在于本机 beam
api:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
如果我使用 BigQueryIO
,我将不得不使用 sc.pipeline.apply
而不是我当前的管道。
有没有办法以某种方式将 BigQueryIO
集成到我当前的管道或以某种方式在当前管道上指定 withTriggeringFrequency
?
Scio 目前不支持指定用于将数据加载到 Big Query 的方法。由于这是不可能的,因此自动 STREAMING_INSERTS
用于无界集合,这显然不能支持截断。因此,您需要回退到指定触发频率 (withTriggeringFrequency(...)
) 和方法 (withMethod(Method.FILE_LOADS)
) 的 Beam BigQueryIO
。
要将其集成到您的 Scio 管道中,您只需使用 saveAsCustomOutput
。
也可以在此处找到示例:https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library