Apache Beam - 与无限 PCollection 的集成测试

Apache Beam - Integration test with unbounded PCollection

我们正在为 Apache Beam 管道构建集成测试,并且 运行 正在解决一些问题。有关上下文,请参见下文...

关于我们管道的详细信息:

目前的测试方法:

这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行。

我们目前遇到的问题是,当我们 运行 我们的管道阻塞时。我们正在使用 DirectRunnerpipeline.run() 而不是 pipeline.run().waitUntilFinish()),但测试似乎在 运行 管道后挂起。因为这是一个无界的 PCollection(在流模式下 运行ning),管道不会终止,因此不会到达它之后的任何代码。

所以,我有几个问题:

1) 有没有办法 运行 一个管道,然后稍后手动停止它?

2) 有没有办法异步 运行 管道?理想情况下,它会启动管道(然后会不断轮询 Pub/Sub 数据),然后继续执行负责发布到 Pub/Sub.

的代码

3) 这种集成测试管道的方法是否合理,或者是否有更好的方法可能更直接?任何 info/guidance 这里将不胜感激。

让我知道是否可以提供任何额外的 code/context - 谢谢!

通过将 isBlockOnRun 管道选项设置为 false,您可以 运行 使用 DirectRunner 异步管道。只要您保持对返回的 PipelineResult 的引用可用,对该结果调用 cancel() 应该会停止管道。

对于你的第三个问题,你的设置似乎是合理的。但是,如果您想对您的管道进行更小规模的测试(需要更少的组件),您可以将所有处理逻辑封装在自定义 PTransform 中。此 PTransform 应从输入源获取已完全解析的输入,并生成尚未为输出接收器解析的输出。

完成后,您可以使用 Create(通常不会执行触发)或 TestStream(可能,取决于您构建 TestStream 的方式) DirectRunner 生成有限数量的输入数据,将此处理 PTransform 应用于 PCollection,并在输出 PCollection 上使用 PAssert 来验证管道生成了您期望的输出。

有关测试的更多信息,B​​eam 网站在 Programming Guide and a blog post 关于使用 TestStream.

测试管道中提供了有关这些测试样式的信息