是否有任何 Java API 可以知道拓扑何时准备好从 Spout 读取第一条消息
Is there any Java API to know when topology is ready for reading first message from Spout
我们的 Apache Storm 拓扑使用 KafkaSpout 侦听来自 Kafka 的消息,并在执行大量 mapping/reducing/enrichment/aggregation 等操作后最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。
为了实现这一点,我们想到启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一 Junit 测试的上下文中启动拓扑。
在开始实际测试之前,我们创建拓扑并将其提交到LocalCluster。它在不同的线程上启动拓扑并从 Before 出来并开始执行我们的测试。到那个时候,拓扑还没有准备好,因为它需要一些时间才能准备好进行处理。是否有任何 java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?
这取决于你说"ready for processing"时的意思。
如果您为 LocalCluster 启用时间模拟,则可以使用 Time.advanceClusterTime
逐步推进时间。如果您在提交拓扑后调用此方法,它只会 return 一旦集群大部分处于空闲状态。参见例如https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.
如果您愿意用存根(例如 FixedTupleSpout)替换 spout,您可以使用Testing.completeTopology
等待拓扑完成处理您设置存根以发出的所有元组。
另一种等待拓扑处理完一些元组的方法是将一些消息放入 Kafka,启动拓扑,然后让测试线程轮询 Cassandra 以查看您期望的消息是否已经通过.这样,您可以在测试线程中设置超时,如果在一定秒数内未满足条件,则测试失败。您可以为此使用像 Awaitility 这样的实用程序 https://github.com/awaitility/awaitility,或者只编写您自己的轮询逻辑。
如果 "ready for processing" 是其他意思,请更详细地描述您的意思。
我们的 Apache Storm 拓扑使用 KafkaSpout 侦听来自 Kafka 的消息,并在执行大量 mapping/reducing/enrichment/aggregation 等操作后最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。
为了实现这一点,我们想到启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一 Junit 测试的上下文中启动拓扑。
在开始实际测试之前,我们创建拓扑并将其提交到LocalCluster。它在不同的线程上启动拓扑并从 Before 出来并开始执行我们的测试。到那个时候,拓扑还没有准备好,因为它需要一些时间才能准备好进行处理。是否有任何 java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?
这取决于你说"ready for processing"时的意思。
如果您为 LocalCluster 启用时间模拟,则可以使用 Time.advanceClusterTime
逐步推进时间。如果您在提交拓扑后调用此方法,它只会 return 一旦集群大部分处于空闲状态。参见例如https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233.
如果您愿意用存根(例如 FixedTupleSpout)替换 spout,您可以使用Testing.completeTopology
等待拓扑完成处理您设置存根以发出的所有元组。
另一种等待拓扑处理完一些元组的方法是将一些消息放入 Kafka,启动拓扑,然后让测试线程轮询 Cassandra 以查看您期望的消息是否已经通过.这样,您可以在测试线程中设置超时,如果在一定秒数内未满足条件,则测试失败。您可以为此使用像 Awaitility 这样的实用程序 https://github.com/awaitility/awaitility,或者只编写您自己的轮询逻辑。
如果 "ready for processing" 是其他意思,请更详细地描述您的意思。