增量加载和 BigQuery
Incremental loading and BigQuery
我正在编写增量加载管道以将数据从 MySQL 加载到 BigQuery 并使用 Google Cloud Datastore 作为元数据存储库。
我现在的流水线是这样写的:
PCollection<TableRow> tbRows =
pipeline.apply("Read from MySQL",
JdbcIO.<TableRow>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.cj.jdbc.Driver", connectionConfig)
.withUsername(username)
.withPassword(password)
.withQuery(query).withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow())))
.setCoder(NullableCoder.of(TableRowJsonCoder.of()));
tbRows.apply("Write to BigQuery",
BigQueryIO.writeTableRows().withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).to(outputTable));
tbRows.apply("Getting timestamp column",
MapElements.into(TypeDescriptors.strings())
.via((final TableRow row) -> (String) row.get(fieldName)))
.setCoder(NullableCoder.of(StringUtf8Coder.of())).apply("Max", Max.globally())
.apply("Updating Datastore", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
DatastoreConnector.udpate(table, c.element());
}
}));
我面临的问题是,当 BigQuery 写入步骤失败时,数据存储仍在更新,有什么方法可以等待 BigQuery 写入完成后再更新数据存储?
谢谢!
目前无法在与 BigQueryIO.writeTableRows()
相同的管道中完成此操作,因为它会产生终端输出 (PDone
)。不过我有一些建议。
- 我怀疑 BigQuery 写入失败的情况很少见。在这种情况下,您可以从辅助 job/process.
中删除相应的 Datastore 数据吗?
- 您是否考虑过更适合写入增量更改数据的 CDC 解决方案。例如,请参阅数据流模板 here.
我正在编写增量加载管道以将数据从 MySQL 加载到 BigQuery 并使用 Google Cloud Datastore 作为元数据存储库。
我现在的流水线是这样写的:
PCollection<TableRow> tbRows =
pipeline.apply("Read from MySQL",
JdbcIO.<TableRow>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.cj.jdbc.Driver", connectionConfig)
.withUsername(username)
.withPassword(password)
.withQuery(query).withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow())))
.setCoder(NullableCoder.of(TableRowJsonCoder.of()));
tbRows.apply("Write to BigQuery",
BigQueryIO.writeTableRows().withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).to(outputTable));
tbRows.apply("Getting timestamp column",
MapElements.into(TypeDescriptors.strings())
.via((final TableRow row) -> (String) row.get(fieldName)))
.setCoder(NullableCoder.of(StringUtf8Coder.of())).apply("Max", Max.globally())
.apply("Updating Datastore", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
DatastoreConnector.udpate(table, c.element());
}
}));
我面临的问题是,当 BigQuery 写入步骤失败时,数据存储仍在更新,有什么方法可以等待 BigQuery 写入完成后再更新数据存储?
谢谢!
目前无法在与 BigQueryIO.writeTableRows()
相同的管道中完成此操作,因为它会产生终端输出 (PDone
)。不过我有一些建议。
- 我怀疑 BigQuery 写入失败的情况很少见。在这种情况下,您可以从辅助 job/process. 中删除相应的 Datastore 数据吗?
- 您是否考虑过更适合写入增量更改数据的 CDC 解决方案。例如,请参阅数据流模板 here.