单元测试端输出

Unit testing side outputs

我正在使用具有 1 个主输出和 1 个副输出的 Apache Beam 对 ParDo 函数进行单元测试:

public class GetPubsubMessageDoFn extends DoFn<PubsubMessage, PubsubPayload.PubsubPayloadDTO> {

  @ProcessElement
  public void processContext(ProcessContext processContext) {
    PubsubPayload pubsubPayload = new PubsubPayload(processContext.element());
    processContext.output(pubsubPayload.getPayload()); //main output
    processContext.output(ORIGIN_PATH_TUPLE_TAG, GCSUtils.toGSURL(pubsubPayload.getPayload().bucket, pubsubPayload.getPayload().name)); //side output
  }
}

我设置了一个单元测试 class 来测试主输出和副输出:

 public class GetPubsubMessageDoFnTest {

      private DoFnTester<PubsubMessage, PubsubPayloadDTO> getPubsubMessageDoFn;   
      private Injector injector;
      private final TupleTagList tags = TupleTagList.of(PUBSUB_PAYLOAD_DTO_TUPLE_TAG).and(ORIGIN_PATH_TUPLE_TAG);


      @Before   
      public void setup() {
        injector = Guice.createInjector(new GetPubsubMessageTestModule());
        this.getPubsubMessageDoFn = DoFnTester.of(injector.getInstance(GetPubsubMessageDoFn.class));
        this.getPubsubMessageDoFn.setOutputTags(tags); //Does not compile
      }

  //Tests

根据 the documentation I should be able to set the side output using setOutputTags(tags) only that function does not exist on the DoFnTester class. I'm using the Google Cloud Dataflow dependency version 2.1.0, which does use a subset of Apache Beam's features, but even looking at the Apache Beam reference documentation for DoFnTester setOutputTags 未列出(即使在介绍中再次提到)。

这些方法在 2.1.0 中不可用。事实上,DoFnTester 正在被弃用,请参阅 https://issues.apache.org/jira/browse/BEAM-3159

建议将 TestPipeline 与 DirectRunner 一起使用,以在其 DoFn 上测试 ParDo。您可以使用 TestStream 仔细控制输入流。看到一个关于这个主题的好文章 blog