Google 数据流根据输入写入多个表
Google dataflow write to mutiple tables based on input
我有要推送到 Google BigQuery 的日志。我正在尝试使用 google 数据流构建整个管道。日志结构不同,可以分为四种不同的类型。在我的管道中,我从 PubSub 读取日志,对其进行解析并写入 BigQuery table。日志需要写入的table取决于日志中的一个参数。问题是我被困在如何在运行时为 BigQueryIO.Write 更改 TableName 的地步。
您可以使用侧面输出。
https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn
以下示例代码读取 BigQuery table 并将其拆分为 3 个不同的 PCollections。每个 PCollections 最终发送到不同的 Pub/Sub 主题(可能是不同的 BigQuery table)。
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};
PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
.withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
.of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
if (c.element().getF().get(2).getV().equals("2010")) {
c.output(c.element().toString());
} else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
c.sideOutput(readings2000plus, c.element().toString());
} else {
c.sideOutput(readingsOld, c.element().toString());
}
}
}));
collectionTuple.get(readings2010)
.apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
.apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
.apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));
p.run();
我有要推送到 Google BigQuery 的日志。我正在尝试使用 google 数据流构建整个管道。日志结构不同,可以分为四种不同的类型。在我的管道中,我从 PubSub 读取日志,对其进行解析并写入 BigQuery table。日志需要写入的table取决于日志中的一个参数。问题是我被困在如何在运行时为 BigQueryIO.Write 更改 TableName 的地步。
您可以使用侧面输出。
https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn
以下示例代码读取 BigQuery table 并将其拆分为 3 个不同的 PCollections。每个 PCollections 最终发送到不同的 Pub/Sub 主题(可能是不同的 BigQuery table)。
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
final TupleTag<String> readings2010 = new TupleTag<String>() {
};
final TupleTag<String> readings2000plus = new TupleTag<String>() {
};
final TupleTag<String> readingsOld = new TupleTag<String>() {
};
PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string")
.withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld))
.of(new DoFn<TableRow, String>() {
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
if (c.element().getF().get(2).getV().equals("2010")) {
c.output(c.element().toString());
} else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) {
c.sideOutput(readings2000plus, c.element().toString());
} else {
c.sideOutput(readingsOld, c.element().toString());
}
}
}));
collectionTuple.get(readings2010)
.apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1"));
collectionTuple.get(readings2000plus)
.apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2"));
collectionTuple.get(readingsOld)
.apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3"));
p.run();