当 collection 大小为 0 时,我们如何防止在数据流管道中写入空文件?
How can we prevent empty file write in dataflow pipeline when collection size is 0?
我有一个数据流管道,我正在解析一个文件,如果我得到任何不正确的记录,然后我将它写入 GCS 存储桶,但是当输入文件数据中没有错误时,TextIO 仍然写入空的GCS 存储桶上的文件 header.
那么,如果 PCollection 大小为零,我们如何才能避免这种情况,然后跳过这一步?
errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
.withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
.withoutSharding()
.withSuffix(".txt")
.withShardNameTemplate("-SSS")
.withNumShards(1));
TextIO.write()
总是至少写入一个分片,即使它是空的。无论如何,当您写入单个分片时,您可以通过在将 to-be-written 元素作为辅助输入的 DoFn 中手动执行写入来解决此行为,例如
PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
View.<String>asList());
// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once.
p.apply(Create.of(new String[]{"whatever"}))
// The side input is your error records.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String unused,
OutputReceiver<String> out,
ProcessContext c) {
List<String> errors = c.sideInput(errorRecordsView);
if (!errors.isEmpty()) {
// Open the file manually and write all the errors to it.
}
}
}).withSideInputs(errorRecordsView);
能够使用本机 Beam 写入来做到这一点是一个合理的要求。我已经提交 https://issues.apache.org/jira/browse/BEAM-14426
我有一个数据流管道,我正在解析一个文件,如果我得到任何不正确的记录,然后我将它写入 GCS 存储桶,但是当输入文件数据中没有错误时,TextIO 仍然写入空的GCS 存储桶上的文件 header.
那么,如果 PCollection 大小为零,我们如何才能避免这种情况,然后跳过这一步?
errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
.withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
.withoutSharding()
.withSuffix(".txt")
.withShardNameTemplate("-SSS")
.withNumShards(1));
TextIO.write()
总是至少写入一个分片,即使它是空的。无论如何,当您写入单个分片时,您可以通过在将 to-be-written 元素作为辅助输入的 DoFn 中手动执行写入来解决此行为,例如
PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
View.<String>asList());
// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once.
p.apply(Create.of(new String[]{"whatever"}))
// The side input is your error records.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(
@Element String unused,
OutputReceiver<String> out,
ProcessContext c) {
List<String> errors = c.sideInput(errorRecordsView);
if (!errors.isEmpty()) {
// Open the file manually and write all the errors to it.
}
}
}).withSideInputs(errorRecordsView);
能够使用本机 Beam 写入来做到这一点是一个合理的要求。我已经提交 https://issues.apache.org/jira/browse/BEAM-14426