将不同的值写入 Apache Beam 中的不同 BigQuery 表
Writing different values to different BigQuery tables in Apache Beam
假设我有一个 PCollection<Foo>
,我想将它写入多个 BigQuery table,为每个 Foo
选择一个可能不同的 table。
我如何使用 Apache Beam 做到这一点 BigQueryIO
API?
这可以使用 Apache Beam 最近添加到 BigQueryIO
的功能来实现。
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
根据输入 PCollection<Foo>
是有界还是无界,这将在后台创建多个 BigQuery 导入作业(每个 table 一个或多个,具体取决于数据量),或者它将使用 BigQuery 流式插入 API.
最灵活的 API 版本使用 DynamicDestinations
,它允许您使用不同的模式将不同的值写入不同的 table,甚至允许您使用侧输入来自所有这些计算中管道的其余部分。
此外,BigQueryIO 已被重构为许多可重用的转换,您可以自己组合这些转换来实现更复杂的用例 - 请参阅 files in the source directory。
此功能将包含在 Apache Beam 的第一个 stable 版本和下一个版本的 Dataflow SDK 中(它将基于 Apache Beam 的第一个 stable 版本) .现在,您可以通过 运行 您的管道对来自 github.
的 Beam 快照使用它
从 Beam 2.12.0 开始,Python SDK 中也提供了此功能。它被标记为实验性的,因此您必须通过 --experiments use_beam_bq_sink
才能启用它。你会这样做:
def get_table_name(element):
if meets_some_condition(element):
return 'mytablename1'
else:
return 'mytablename2'
p = beam.Pipeline(...)
my_input_pcoll = p | ReadInMyPCollection()
my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)
新接收器支持许多其他选项,您可以review in the pydoc
假设我有一个 PCollection<Foo>
,我想将它写入多个 BigQuery table,为每个 Foo
选择一个可能不同的 table。
我如何使用 Apache Beam 做到这一点 BigQueryIO
API?
这可以使用 Apache Beam 最近添加到 BigQueryIO
的功能来实现。
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
根据输入 PCollection<Foo>
是有界还是无界,这将在后台创建多个 BigQuery 导入作业(每个 table 一个或多个,具体取决于数据量),或者它将使用 BigQuery 流式插入 API.
最灵活的 API 版本使用 DynamicDestinations
,它允许您使用不同的模式将不同的值写入不同的 table,甚至允许您使用侧输入来自所有这些计算中管道的其余部分。
此外,BigQueryIO 已被重构为许多可重用的转换,您可以自己组合这些转换来实现更复杂的用例 - 请参阅 files in the source directory。
此功能将包含在 Apache Beam 的第一个 stable 版本和下一个版本的 Dataflow SDK 中(它将基于 Apache Beam 的第一个 stable 版本) .现在,您可以通过 运行 您的管道对来自 github.
的 Beam 快照使用它从 Beam 2.12.0 开始,Python SDK 中也提供了此功能。它被标记为实验性的,因此您必须通过 --experiments use_beam_bq_sink
才能启用它。你会这样做:
def get_table_name(element):
if meets_some_condition(element):
return 'mytablename1'
else:
return 'mytablename2'
p = beam.Pipeline(...)
my_input_pcoll = p | ReadInMyPCollection()
my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)
新接收器支持许多其他选项,您可以review in the pydoc