如何在 Dataflow 中正确使用 'flatten'

How to use 'flatten' correctly in Dataflow

我们的管道是这样的:

GCS(gz 压缩文件)-> ParDo -> BigQuery

我想使用 'flatten' 从 GCS 中获取多个文件作为我管道的输入。但它一直因错误而烦恼:

Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
 Message: Load configuration must specify at least one source URI
 HTTP Code: 400

代码:

PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
        PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
        PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
        PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
                .named(String.format("%s-CPT-transform", type))
                .withSideInputs(views))
                .apply(Write.to(getOutputTable(type))
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_APPEND)
                        .withSchema(schema)
                        .named(String.format("%s-BQ-write", type)));

示例作业 ID:2015-05-12_19_54_06-10158770219525037626

我做错了什么?

在将空 PCollection 写入 BigQuery 时,我们这边存在错误。我已经重现了这个问题——我们会尽快解决它并在此处跟进。

如果您需要能够创建一个可能为空的结果 PCollection,并且您可以在结果为空的情况下处理添加到 BigQuery table 的单个空行,您可以暂时现在使用此 hack 解决此问题:

// Temporary hack around a temporary bug writing empty PCollections to BigQuery by
// creating a single empty row if a PCollection<TableRow> is empty.
static class AddEmptyRowIfEmpty
        extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {

    @Override
    public PCollection<TableRow> apply(PCollection<TableRow> maybeEmpty) {

        // Build a PCollection that contains no elements if 'maybeEmpty' has elements, or
        // exactly one empty TableRow if 'maybeEmpty' is empty.
        final PCollectionView<Iterable<TableRow>> maybeEmptyView = maybeEmpty.apply(
                View.<TableRow>asIterable());
        PCollection<TableRow> singleRowIfMaybeEmptyIsEmpty =
                maybeEmpty.getPipeline()
                    .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
                    .apply(ParDo.of(
                        new DoFn<Void, TableRow>() {
                          @Override
                          public void processElement(ProcessContext c) {
                            Iterator<TableRow> rows = c.sideInput(maybeEmptyView).iterator();
                            if (!rows.hasNext()) {
                              c.output(new TableRow());
                            }
                          }
                        }).withSideInputs(maybeEmptyView));

        // Return a PCollection with at least one element.
        return PCollectionList.of(singleRowIfMaybeEmptyIsEmpty).and(maybeEmpty)
                .apply(Flatten.<TableRow>pCollections());

    }
}

// Then in your pipeline:
...
.apply(new AddEmptyRowIfEmpty())
.apply(BigQueryIO.Write(...))

我没有使用建议的 hack,这真的很粗糙,而是在 finishBundle() 方法中写了一个空行。这将为每个包写入 1 个空行,但我们可以接受它,直到推出修复程序。设置 "id" 可以更容易地在以后过滤掉这些行。

此外,这个 workaround/hack 更容易实现:

@Override
public void finishBundle(Context c) throws Exception {
   TableRow workaroundRow = new TableRow();
   workaroundRow.set("id", "workaround_row");
   c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}