在调试器中调试 apache beam/dataflow?

debug apache beam / dataflow in a debugger?

首先与此高度相关 post ->

我们在生产中有一些这样的代码

@Override
public PDone expand(PCollectionTuple in) {
    final RosterPipelineOptions options = (RosterPipelineOptions) in.getPipeline().getOptions();
    try {
        final Schema schema = new Schema.Parser().parse(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc"));
        final String schemaStr = new BufferedReader(new InputStreamReader(PractitionerStandardAvroWriter.class.getResourceAsStream("/standardFormat.avsc")))
            .lines().collect(Collectors.joining("\n"));

        final PCollection<Validated<PractitionerStandardOutputDto>> validOutputs = in.get(PractitionerStandardOutputConverter.VALID_OUTPUTS_TAG);
        final PCollection<Validated<PractitionerStandardOutputDto>> invalidOutputs = in.get(PractitionerStandardOutputConverter.INVALID_OUTPUTS_TAG);

        final PCollection<GenericRecord> validRecords = validOutputs.apply(
            "Transform Valid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));
        final PCollection<GenericRecord> invalidRecords = invalidOutputs.apply(
            "Transform Invalid Standard Output into Generic Rows", ParDo.of(new GenericRecordConverter(schemaStr)));

        validRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Write Records", AvroIO.writeGenericRecords(schema)
                .to(options.getValidOutput())
                .withoutSharding());

        final PCollection<String> invalidRows = invalidRecords
            .setCoder(AvroCoder.of(GenericRecord.class, schema))
            .apply("Convert Error Avro to Csv", ParDo.of(new AvroToCsvConvertFn(schemaStr, ",")));
        invalidRows.apply("Write Error Records to Csv",
            TextIO.write().to(options.getInvalidOutput()).withoutSharding());

        return PDone.in(in.getPipeline());
    }
    catch (IOException e) {
        SneakyThrow.sneak(e); return null;
    }
}

但是当我们在测试中调试它时,我们看不到错误。很难通过这段代码来查看发生了什么,因为我认为是 运行 在这段代码之后变形数据,而这段代码只是如何处理传入数据的定义。如果我在这里错了,请纠正我?

两个问题

谢谢, 院长

一般来说,就像你的另一个问题一样,我的建议如下:

  1. 要单步执行您的管道,您可以编写 运行 和 IDE 的单元测试,它将在 DirectRunner 中 运行。这使您可以轻松地逐步完成管道。这不是 运行在 Dataflow 中,而是在本地 - 它仍然很有价值。

您可以在expand中设置断点,这些断点将在管道构建时命中。您可以在 DoFn 的 process 或源的 splitread 中设置断点 - 这些将在管道执行时命中。

  1. 关于编写可调试管道的建议 - 好吧,在这种情况下,我的建议是编写可以单独测试的可组合转换。您可以使用 Beam 拥有的各种测试实用程序来为您的管道编写测试。参见:https://beam.apache.org/documentation/pipelines/test-your-pipeline/

一些最有价值的测试实用程序是 PAssertTestStreamTestPipeline。我建议您查看我共享的页面,看看这些实用程序是否有帮助。


对于您的特定管道,我可能认为您可以将 PTransform 分成更小的部分,然后为每个部分编写简单的单元测试。