将 Side input 应用于 Apache Beam 中的 BigQueryIO.read 操作

Apply Side input to BigQueryIO.read operation in Apache Beam

有没有办法在 Apache Beam 中将辅助输入应用于 BigQueryIO.read() 操作。

例如,我在 PCollection 中有一个值,我想在查询中使用它从 BigQuery table 获取数据。这可能使用侧面输入吗?或者在这种情况下应该使用其他东西吗?

我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它。或者我可以在这里使用同样的东西吗?如有不妥请指正

我试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
    Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
    }).withSideInputs(bqDataView));

我得到的错误是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline@86b455
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
    at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    ... 4 more

Beam模型目前还不能很好地支持这种依赖数据的操作

一种方法是编写您自己的 DoFn 代码,它接收侧面输入并直接连接到 BQ。不幸的是,这不会给你任何并行性,因为 DoFn 会 运行 完全在同一个线程上。

一旦 Beam 支持 Splittable DoFn,情况就不同了。


在目前的情况下,您需要使用 BQ client library 来添加查询 BQ 的代码,就好像您不在 Beam 管道中一样。

鉴于您问题中的代码,关于如何实现它的粗略想法如下:

class ReadDataDoFn extends DoFn<String,TableRow>(){
    private Tabledata tableRequest;

    private Bigquery bigQueryClient;

    private Bigquery createBigQueryClientWithinDoFn() {
        // I'm not sure how you'd implement this, but you had the right idea
    }

    @Setup
    public void setup() {
        bigQueryClient = createBigQueryClientWithinDoFn(); 
        tableRequest = bigQueryClient.tabledata();
    }
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
}

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));