当 collection 大小为 0 时,我们如何防止在数据流管道中写入空文件?

How can we prevent empty file write in dataflow pipeline when collection size is 0?

我有一个数据流管道,我正在解析一个文件,如果我得到任何不正确的记录,然后我将它写入 GCS 存储桶,但是当输入文件数据中没有错误时,TextIO 仍然写入空的GCS 存储桶上的文件 header.

那么,如果 PCollection 大小为零,我们如何才能避免这种情况,然后跳过这一步?

errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
             .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
             .withoutSharding()
             .withSuffix(".txt")
             .withShardNameTemplate("-SSS")
             .withNumShards(1));
        

TextIO.write() 总是至少写入一个分片,即使它是空的。无论如何,当您写入单个分片时,您可以通过在将 to-be-written 元素作为辅助输入的 DoFn 中手动执行写入来解决此行为,例如

PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
    View.<String>asList());

// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once. 
p.apply(Create.of(new String[]{"whatever"}))
 // The side input is your error records.
 .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(
          @Element String unused,
          OutputReceiver<String> out,
          ProcessContext c) {
        List<String> errors = c.sideInput(errorRecordsView);
        if (!errors.isEmpty()) {
          // Open the file manually and write all the errors to it.
        }
      }
  }).withSideInputs(errorRecordsView);

能够使用本机 Beam 写入来做到这一点是一个合理的要求。我已经提交 https://issues.apache.org/jira/browse/BEAM-14426