如何在 Dataflow 中正确使用 'flatten'
How to use 'flatten' correctly in Dataflow
我们的管道是这样的:
GCS(gz 压缩文件)-> ParDo -> BigQuery
我想使用 'flatten' 从 GCS 中获取多个文件作为我管道的输入。但它一直因错误而烦恼:
Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
Message: Load configuration must specify at least one source URI
HTTP Code: 400
代码:
PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
.named(String.format("%s-CPT-transform", type))
.withSideInputs(views))
.apply(Write.to(getOutputTable(type))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema)
.named(String.format("%s-BQ-write", type)));
示例作业 ID:2015-05-12_19_54_06-10158770219525037626
我做错了什么?
在将空 PCollection 写入 BigQuery 时,我们这边存在错误。我已经重现了这个问题——我们会尽快解决它并在此处跟进。
如果您需要能够创建一个可能为空的结果 PCollection,并且您可以在结果为空的情况下处理添加到 BigQuery table 的单个空行,您可以暂时现在使用此 hack 解决此问题:
// Temporary hack around a temporary bug writing empty PCollections to BigQuery by
// creating a single empty row if a PCollection<TableRow> is empty.
static class AddEmptyRowIfEmpty
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> apply(PCollection<TableRow> maybeEmpty) {
// Build a PCollection that contains no elements if 'maybeEmpty' has elements, or
// exactly one empty TableRow if 'maybeEmpty' is empty.
final PCollectionView<Iterable<TableRow>> maybeEmptyView = maybeEmpty.apply(
View.<TableRow>asIterable());
PCollection<TableRow> singleRowIfMaybeEmptyIsEmpty =
maybeEmpty.getPipeline()
.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
.apply(ParDo.of(
new DoFn<Void, TableRow>() {
@Override
public void processElement(ProcessContext c) {
Iterator<TableRow> rows = c.sideInput(maybeEmptyView).iterator();
if (!rows.hasNext()) {
c.output(new TableRow());
}
}
}).withSideInputs(maybeEmptyView));
// Return a PCollection with at least one element.
return PCollectionList.of(singleRowIfMaybeEmptyIsEmpty).and(maybeEmpty)
.apply(Flatten.<TableRow>pCollections());
}
}
// Then in your pipeline:
...
.apply(new AddEmptyRowIfEmpty())
.apply(BigQueryIO.Write(...))
我没有使用建议的 hack,这真的很粗糙,而是在 finishBundle()
方法中写了一个空行。这将为每个包写入 1 个空行,但我们可以接受它,直到推出修复程序。设置 "id" 可以更容易地在以后过滤掉这些行。
此外,这个 workaround/hack 更容易实现:
@Override
public void finishBundle(Context c) throws Exception {
TableRow workaroundRow = new TableRow();
workaroundRow.set("id", "workaround_row");
c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}
我们的管道是这样的:
GCS(gz 压缩文件)-> ParDo -> BigQuery
我想使用 'flatten' 从 GCS 中获取多个文件作为我管道的输入。但它一直因错误而烦恼:
Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
Message: Load configuration must specify at least one source URI
HTTP Code: 400
代码:
PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
.named(String.format("%s-CPT-transform", type))
.withSideInputs(views))
.apply(Write.to(getOutputTable(type))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema)
.named(String.format("%s-BQ-write", type)));
示例作业 ID:2015-05-12_19_54_06-10158770219525037626
我做错了什么?
在将空 PCollection 写入 BigQuery 时,我们这边存在错误。我已经重现了这个问题——我们会尽快解决它并在此处跟进。
如果您需要能够创建一个可能为空的结果 PCollection,并且您可以在结果为空的情况下处理添加到 BigQuery table 的单个空行,您可以暂时现在使用此 hack 解决此问题:
// Temporary hack around a temporary bug writing empty PCollections to BigQuery by
// creating a single empty row if a PCollection<TableRow> is empty.
static class AddEmptyRowIfEmpty
extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> apply(PCollection<TableRow> maybeEmpty) {
// Build a PCollection that contains no elements if 'maybeEmpty' has elements, or
// exactly one empty TableRow if 'maybeEmpty' is empty.
final PCollectionView<Iterable<TableRow>> maybeEmptyView = maybeEmpty.apply(
View.<TableRow>asIterable());
PCollection<TableRow> singleRowIfMaybeEmptyIsEmpty =
maybeEmpty.getPipeline()
.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
.apply(ParDo.of(
new DoFn<Void, TableRow>() {
@Override
public void processElement(ProcessContext c) {
Iterator<TableRow> rows = c.sideInput(maybeEmptyView).iterator();
if (!rows.hasNext()) {
c.output(new TableRow());
}
}
}).withSideInputs(maybeEmptyView));
// Return a PCollection with at least one element.
return PCollectionList.of(singleRowIfMaybeEmptyIsEmpty).and(maybeEmpty)
.apply(Flatten.<TableRow>pCollections());
}
}
// Then in your pipeline:
...
.apply(new AddEmptyRowIfEmpty())
.apply(BigQueryIO.Write(...))
我没有使用建议的 hack,这真的很粗糙,而是在 finishBundle()
方法中写了一个空行。这将为每个包写入 1 个空行,但我们可以接受它,直到推出修复程序。设置 "id" 可以更容易地在以后过滤掉这些行。
此外,这个 workaround/hack 更容易实现:
@Override
public void finishBundle(Context c) throws Exception {
TableRow workaroundRow = new TableRow();
workaroundRow.set("id", "workaround_row");
c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}