通用云数据流模式——有更好的方法吗?

Common Cloud Dataflow pattern - is there a better way?

我们发现自己经常在 Dataflow 中使用以下模式:

  1. 从 BigQuery TableRow
  2. 执行键提取 ParDo
  3. 对 1
  4. 的结果执行 GroupByKey
  5. 对 2
  6. 的结果执行扁平化 ParDo

Dataflow 中是否有一个操作可以一次性实现这一点(至少从 API 的角度来看)?

我看过 Combine 操作,但它似乎更适合在计算值时使用,例如sums/averages等

你的问题没有太多细节,我只能给出一般性的建议。

您可以创建一个 PTransform 将上述模式组合成一个复合转换。这允许您将经常使用的操作放在一个单一的可重用组件中。

下面的代码应该让您明白我的意思:

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;


class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = c.element();
        Object key = row.get("key");
        if (key != null) {
            c.output(KV.of(key.toString(), row));
        }
    }
}

class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {

    public CompositeTransform(String name) {
        super(name);
    }

    public static CompositeTransform named(String name) {
        return new CompositeTransform(name);
    }

    @Override
    public PCollection<TableRow> apply(PCollection<TableRow> input) {
        return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
             .apply(GroupByKey.create())
             // potentially more transformations
            .apply(Values.create()) // get only the values ( because we have a kv )
            .apply(Flatten.iterables()); // flatten them out
    }
}

public class Main {

    public static void run(PipelineOptions options) {
        Pipeline p = Pipeline.create(options);

        // read input
        p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))

         // apply fancy transform
         .apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))

         // write output
         .apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));


        p.run();
    }
}