在调试器中调试 apache beam/dataflow?
debug apache beam / dataflow in a debugger?
首先与此高度相关 post ->
我们在生产中有一些这样的代码
@Override
public PDone expand(PCollectionTuple in) {
final RosterPipelineOptions options = (RosterPipelineOptions) in.getPipeline().getOptions();
try {
final Schema schema = new Schema.Parser().parse(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc"));
final String schemaStr = new BufferedReader(new InputStreamReader(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc")))
.lines().collect(Collectors.joining("\n"));
final PCollection<Validated<PractitionerStandardOutputDto>> validOutputs = in.get(PractitionerStandardOutputConverter.VALID_OUTPUTS_TAG);
final PCollection<Validated<PractitionerStandardOutputDto>> invalidOutputs = in.get(PractitionerStandardOutputConverter.INVALID_OUTPUTS_TAG);
final PCollection<GenericRecord> validRecords = validOutputs.apply(
"Transform Valid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
final PCollection<GenericRecord> invalidRecords = invalidOutputs.apply(
"Transform Invalid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
validRecords
.setCoder(AvroCoder.of(GenericRecord.class, schema))
.apply("Write Records", AvroIO.writeGenericRecords(schema)
.to(options.getValidOutput())
.withoutSharding());
final PCollection<String> invalidRows = invalidRecords
.setCoder(AvroCoder.of(GenericRecord.class, schema))
.apply("Convert Error Avro to Csv", ParDo.of(new AvroToCsvConvertFn(schemaStr, ",")));
invalidRows.apply("Write Error Records to Csv",
TextIO.write().to(options.getInvalidOutput()).withoutSharding());
return PDone.in(in.getPipeline());
}
catch (IOException e) {
SneakyThrow.sneak(e); return null;
}
}
但是当我们在测试中调试它时,我们看不到错误。很难通过这段代码来查看发生了什么,因为我认为是 运行 在这段代码之后变形数据,而这段代码只是如何处理传入数据的定义。如果我在这里错了,请纠正我?
两个问题
- 这是编写可调试的 apache-beam/dataflow 代码的最佳方式吗?我们可以单步调试并轻松查看错误所在?
- 有没有其他方法可以轻松调试它,因为我怀疑 'real execution' 在应用东西时发生在该方法之后?
谢谢, 院长
一般来说,就像你的另一个问题一样,我的建议如下:
- 要单步执行您的管道,您可以编写 运行 和 IDE 的单元测试,它将在 DirectRunner 中 运行。这使您可以轻松地逐步完成管道。这不是 运行在 Dataflow 中,而是在本地 - 它仍然很有价值。
您可以在expand
中设置断点,这些断点将在管道构建时命中。您可以在 DoFn 的 process
或源的 split
、read
中设置断点 - 这些将在管道执行时命中。
- 关于编写可调试管道的建议 - 好吧,在这种情况下,我的建议是编写可以单独测试的可组合转换。您可以使用 Beam 拥有的各种测试实用程序来为您的管道编写测试。参见:https://beam.apache.org/documentation/pipelines/test-your-pipeline/
一些最有价值的测试实用程序是 PAssert
、TestStream
和 TestPipeline
。我建议您查看我共享的页面,看看这些实用程序是否有帮助。
对于您的特定管道,我可能认为您可以将 PTransform 分成更小的部分,然后为每个部分编写简单的单元测试。