使用Google数据流处理数据时如何报告无效数据?
How to report invalid data while processing data with Google dataflow?
我正在查看documentation and the provided examples to find out how I can report invalid data while processing data with Google's dataflow service。
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
.apply(new SomeTransformation())
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
p.run();
除了实际的 in-/output,我还想生成第二个输出文件,其中包含被视为无效的记录(例如缺失数据、格式错误的数据、值过高)。我想对这些记录进行故障排除并单独处理它们。
- 输入:gs://.../input.csv
- 输出:gs://.../output.csv
- 无效记录列表:gs://.../invalid.csv
如何将这些无效记录重定向到单独的输出?
您可以使用 PCollectionTuples 到 return 来自单个转换的多个 PCollection。例如,
TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");
Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
.apply(new SomeTransformation());
all.get(mainOutput)
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
.apply(TextIO.Write.named("WriteMissingData").to(...));
...
PCollectionTuples 可以直接从现有的 PCollection 中构建,也可以从具有边输出的 ParDo 操作发出,例如
PCollectionTuple partitioned = input.apply(ParDo
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
if (checkOK(c.element()) {
// Shows up in partitioned.get(mainOutput).
c.output(...);
} else if (hasMissingData(c.element())) {
// Shows up in partitioned.get(missingData).
c.sideOutput(missingData, c.element());
} else {
// Shows up in partitioned.get(badValues).
c.sideOutput(badValues, c.element());
}
}
})
.withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));
请注意,一般来说,各种侧输出不需要具有相同的类型,并且可以将数据多次发送到任意数量的侧输出(而不是我们这里的严格分区)。
你的 SomeTransformation class 看起来像
class SomeTransformation extends PTransform<PCollection<String>,
PCollectionTuple> {
public PCollectionTuple apply(PCollection<String> input) {
// Filter into good and bad data.
PCollectionTuple partitioned = ...
// Process the good data.
PCollection<String> processed =
partitioned.get(mainOutput)
.apply(...)
.apply(...)
...;
// Repackage everything into a new output tuple.
return PCollectionTuple.of(mainOutput, processed)
.and(missingData, partitioned.get(missingData))
.and(badValues, partitioned.get(badValues));
}
}
Robert 关于使用 sideOutputs 的建议很好,但请注意,这只有在您的 ParDos 识别出错误数据时才有效。目前没有办法识别在初始解码期间命中的错误记录(在 Coder.decode 中命中错误)。我们已计划尽快开展工作。
我正在查看documentation and the provided examples to find out how I can report invalid data while processing data with Google's dataflow service。
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
.apply(new SomeTransformation())
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
p.run();
除了实际的 in-/output,我还想生成第二个输出文件,其中包含被视为无效的记录(例如缺失数据、格式错误的数据、值过高)。我想对这些记录进行故障排除并单独处理它们。
- 输入:gs://.../input.csv
- 输出:gs://.../output.csv
- 无效记录列表:gs://.../invalid.csv
如何将这些无效记录重定向到单独的输出?
您可以使用 PCollectionTuples 到 return 来自单个转换的多个 PCollection。例如,
TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");
Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
.apply(new SomeTransformation());
all.get(mainOutput)
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
.apply(TextIO.Write.named("WriteMissingData").to(...));
...
PCollectionTuples 可以直接从现有的 PCollection 中构建,也可以从具有边输出的 ParDo 操作发出,例如
PCollectionTuple partitioned = input.apply(ParDo
.of(new DoFn<String, String>() {
public void processElement(ProcessContext c) {
if (checkOK(c.element()) {
// Shows up in partitioned.get(mainOutput).
c.output(...);
} else if (hasMissingData(c.element())) {
// Shows up in partitioned.get(missingData).
c.sideOutput(missingData, c.element());
} else {
// Shows up in partitioned.get(badValues).
c.sideOutput(badValues, c.element());
}
}
})
.withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));
请注意,一般来说,各种侧输出不需要具有相同的类型,并且可以将数据多次发送到任意数量的侧输出(而不是我们这里的严格分区)。
你的 SomeTransformation class 看起来像
class SomeTransformation extends PTransform<PCollection<String>,
PCollectionTuple> {
public PCollectionTuple apply(PCollection<String> input) {
// Filter into good and bad data.
PCollectionTuple partitioned = ...
// Process the good data.
PCollection<String> processed =
partitioned.get(mainOutput)
.apply(...)
.apply(...)
...;
// Repackage everything into a new output tuple.
return PCollectionTuple.of(mainOutput, processed)
.and(missingData, partitioned.get(missingData))
.and(badValues, partitioned.get(badValues));
}
}
Robert 关于使用 sideOutputs 的建议很好,但请注意,这只有在您的 ParDos 识别出错误数据时才有效。目前没有办法识别在初始解码期间命中的错误记录(在 Coder.decode 中命中错误)。我们已计划尽快开展工作。