Apache Beam - 与无限 PCollection 的集成测试
Apache Beam - Integration test with unbounded PCollection
我们正在为 Apache Beam 管道构建集成测试,并且 运行 正在解决一些问题。有关上下文,请参见下文...
关于我们管道的详细信息:
- 我们使用
PubsubIO
作为我们的数据源(无界 PCollection
)
- 中间转换包括自定义
CombineFn
和非常简单的 windowing/triggering 策略
- 我们最终的转换是
JdbcIO
,使用org.neo4j.jdbc.Driver
写入Neo4j
目前的测试方法:
- 运行 Google Cloud 的 Pub/Sub 机器上的模拟器,测试运行正在
- 构建内存中的 Neo4j 数据库并将其 URI 传递到我们的管道选项中
- 运行 管道通过调用
OurPipeline.main(TestPipeline.convertToArgs(options)
- 使用 Google Cloud 的 Java Pub/Sub 客户端库将消息发布到测试主题(使用 Pub/Sub 模拟器),
PubsubIO
将从中读取
- 数据应该流经管道并最终到达我们的 Neo4j 内存实例
- 就此数据在 Neo4j 中的存在做出简单断言
这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行。
我们目前遇到的问题是,当我们 运行 我们的管道阻塞时。我们正在使用 DirectRunner
和 pipeline.run()
( 而不是 pipeline.run().waitUntilFinish()
),但测试似乎在 运行 管道后挂起。因为这是一个无界的 PCollection
(在流模式下 运行ning),管道不会终止,因此不会到达它之后的任何代码。
所以,我有几个问题:
1) 有没有办法 运行 一个管道,然后稍后手动停止它?
2) 有没有办法异步 运行 管道?理想情况下,它会启动管道(然后会不断轮询 Pub/Sub 数据),然后继续执行负责发布到 Pub/Sub.
的代码
3) 这种集成测试管道的方法是否合理,或者是否有更好的方法可能更直接?任何 info/guidance 这里将不胜感激。
让我知道是否可以提供任何额外的 code/context - 谢谢!
通过将 isBlockOnRun
管道选项设置为 false
,您可以 运行 使用 DirectRunner
异步管道。只要您保持对返回的 PipelineResult
的引用可用,对该结果调用 cancel()
应该会停止管道。
对于你的第三个问题,你的设置似乎是合理的。但是,如果您想对您的管道进行更小规模的测试(需要更少的组件),您可以将所有处理逻辑封装在自定义 PTransform
中。此 PTransform
应从输入源获取已完全解析的输入,并生成尚未为输出接收器解析的输出。
完成后,您可以使用 Create
(通常不会执行触发)或 TestStream
(可能,取决于您构建 TestStream
的方式) DirectRunner
生成有限数量的输入数据,将此处理 PTransform
应用于 PCollection
,并在输出 PCollection
上使用 PAssert
来验证管道生成了您期望的输出。
有关测试的更多信息,Beam 网站在 Programming Guide and a blog post 关于使用 TestStream
.
测试管道中提供了有关这些测试样式的信息
我们正在为 Apache Beam 管道构建集成测试,并且 运行 正在解决一些问题。有关上下文,请参见下文...
关于我们管道的详细信息:
- 我们使用
PubsubIO
作为我们的数据源(无界PCollection
) - 中间转换包括自定义
CombineFn
和非常简单的 windowing/triggering 策略 - 我们最终的转换是
JdbcIO
,使用org.neo4j.jdbc.Driver
写入Neo4j
目前的测试方法:
- 运行 Google Cloud 的 Pub/Sub 机器上的模拟器,测试运行正在
- 构建内存中的 Neo4j 数据库并将其 URI 传递到我们的管道选项中
- 运行 管道通过调用
OurPipeline.main(TestPipeline.convertToArgs(options)
- 使用 Google Cloud 的 Java Pub/Sub 客户端库将消息发布到测试主题(使用 Pub/Sub 模拟器),
PubsubIO
将从中读取 - 数据应该流经管道并最终到达我们的 Neo4j 内存实例
- 就此数据在 Neo4j 中的存在做出简单断言
这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行。
我们目前遇到的问题是,当我们 运行 我们的管道阻塞时。我们正在使用 DirectRunner
和 pipeline.run()
( 而不是 pipeline.run().waitUntilFinish()
),但测试似乎在 运行 管道后挂起。因为这是一个无界的 PCollection
(在流模式下 运行ning),管道不会终止,因此不会到达它之后的任何代码。
所以,我有几个问题:
1) 有没有办法 运行 一个管道,然后稍后手动停止它?
2) 有没有办法异步 运行 管道?理想情况下,它会启动管道(然后会不断轮询 Pub/Sub 数据),然后继续执行负责发布到 Pub/Sub.
的代码3) 这种集成测试管道的方法是否合理,或者是否有更好的方法可能更直接?任何 info/guidance 这里将不胜感激。
让我知道是否可以提供任何额外的 code/context - 谢谢!
通过将 isBlockOnRun
管道选项设置为 false
,您可以 运行 使用 DirectRunner
异步管道。只要您保持对返回的 PipelineResult
的引用可用,对该结果调用 cancel()
应该会停止管道。
对于你的第三个问题,你的设置似乎是合理的。但是,如果您想对您的管道进行更小规模的测试(需要更少的组件),您可以将所有处理逻辑封装在自定义 PTransform
中。此 PTransform
应从输入源获取已完全解析的输入,并生成尚未为输出接收器解析的输出。
完成后,您可以使用 Create
(通常不会执行触发)或 TestStream
(可能,取决于您构建 TestStream
的方式) DirectRunner
生成有限数量的输入数据,将此处理 PTransform
应用于 PCollection
,并在输出 PCollection
上使用 PAssert
来验证管道生成了您期望的输出。
有关测试的更多信息,Beam 网站在 Programming Guide and a blog post 关于使用 TestStream
.