PAssert 未执行且测试始终传入无限源

PAssert not executing and test always passing in unbounded source

我正在编写一个简单的测试用例来检查生成序列是否根据持续时间发射元素。 源代码

 pipeline.apply("Source", buildSource(options))
        .apply("Windowing", defineWindow(options))
       .apply("processingState", ParDo.of(new JdbcIoPreProcessing(duration)))

 protected PTransform<? super PBegin, PCollection<Long>> buildSource(PipelineDefaultOptions pipelineDefaultOptions) {
        return GenerateSequence.from(0).withRate(1, Duration.standardSeconds(options.getPollingDuration()));
    }
protected Window<Long> defineWindow(PipelineDefaultOptions pipelineDefaultOptions) {
        OrdersPipelineOptions options = (OrdersPipelineOptions) pipelineDefaultOptions;
        return Window.into(FixedWindows.of(Duration.standardSeconds(options.getPollingDuration())));
    }

测试用例

 Pipeline pipeline = TestPipeline.create()
                .enableAbandonedNodeEnforcement(false);
 pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
 PCollection<Long> actual = pipeline.apply(application.buildSource(options));
 PAssert.that(actual).containsInAnyOrder(10l)
 PipelineResult result = pipeline.run();
        PipelineResult.State state = result.waitUntilFinish(Duration.standardSeconds(10));
        result.cancel();

这个 PAssert 是否正确,或者如何将 PAssert 放在这种情况下。我在文档中看到,必须提供 window。

我试过区间window,有界window IntervalWindow windowFirstEvent = new IntervalWindow(baseTime.plus(Duration.standardSeconds(0)), baseTime.plus(Duration.standardSeconds(options.getPollingDuration())));

BoundedWindow window = new BoundedWindow() {
    @Override
    public Instant maxTimestamp() {
        return baseTime.plus(Duration.standardSeconds(100));
    }
};

PAssert.that(实际).inWindow(windowFirstEvent).containsInAnyOrder(10l) 但得到铸造异常 无法转换为 org.apache.beam.sdk.transforms.windowing.GlobalWindow 提前致谢。

您应该在测试流式传输管道时使用 TestStream。 TestStream 是一个非常宝贵的流媒体资源,可让您模拟 wall-time 时钟和水印进步。看到这个 blog post and the documentation.

对于您的情况,(此解决方案未经测试,之后可以随意编辑),它可能看起来像这样:


TestStream<int> buildSource(PipelineOptions options) {
  TestStream.Builder<int> testStreamBuilder = new TestStream.create<>();
  for (int i = 0; i < MAX_TIME; ++i) {
    testStreamBuilder
      .addElements(i) 
      .advanceProcessingTime(
          Duration.standardSeconds(options.getPollingDuration()));
  }

  return testStreamBuilder.advanceWatermarkToInfinity();
}

 pipeline.apply("Source", buildSource(options))
         .apply("Windowing", defineWindow(options))
         .apply("processingState", ParDo.of(new JdbcIoPreProcessing(duration)))