将不同的值写入 Apache Beam 中的不同 BigQuery 表

Writing different values to different BigQuery tables in Apache Beam

假设我有一个 PCollection<Foo>,我想将它写入多个 BigQuery table,为每个 Foo 选择一个可能不同的 table。

我如何使用 Apache Beam 做到这一点 BigQueryIO API?

这可以使用 Apache Beam 最近添加到 BigQueryIO 的功能来实现。

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

根据输入 PCollection<Foo> 是有界还是无界,这将在后台创建多个 BigQuery 导入作业(每个 table 一个或多个,具体取决于数据量),或者它将使用 BigQuery 流式插入 API.

最灵活的 API 版本使用 DynamicDestinations,它允许您使用不同的模式将不同的值写入不同的 table,甚至允许您使用侧输入来自所有这些计算中管道的其余部分。

此外,BigQueryIO 已被重构为许多可重用的转换,您可以自己组合这些转换来实现更复杂的用例 - 请参阅 files in the source directory

此功能将包含在 Apache Beam 的第一个 stable 版本和下一个版本的 Dataflow SDK 中(它将基于 Apache Beam 的第一个 stable 版本) .现在,您可以通过 运行 您的管道对来自 github.

的 Beam 快照使用它

从 Beam 2.12.0 开始,Python SDK 中也提供了此功能。它被标记为实验性的,因此您必须通过 --experiments use_beam_bq_sink 才能启用它。你会这样做:

def get_table_name(element):
  if meets_some_condition(element):
    return 'mytablename1'
  else:
    return 'mytablename2'


p = beam.Pipeline(...)

my_input_pcoll = p | ReadInMyPCollection()

my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)

新接收器支持许多其他选项,您可以review in the pydoc