如何在 Storm 中停止元组处理并执行其他代码

How to stop tuple processing in Storm and execute other code

我是Storm的新手。我正在将它用于一个大学项目。

我创建了我的拓扑结构,其中有一个 Spout 链接到 MySql 数据库,还有两个 Bolt。第一个螺栓,链接到喷口,准备和删除元组不需要的信息;第二,过滤元组。

我在本地模式下工作。

我的问题是: 为什么在 运行ning 拓扑之后,在我的控制台中我看到如下行的输出?

38211 [Thread-14-movie-SPOUT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
67846 [Thread-10-__acker] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67846 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67852 [Thread-10-__acker] INFO  backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@3c270095> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]]
67853 [Thread-8-cleaning-genre-bolt] INFO  backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@38c3d111> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]]
67854 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67855 [Thread-13-filtering-genre-BOLT] INFO  backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6d5c75a9> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]

我读到在处理最后一个元组之后的这些行被认为是正常的。不是吗?

提交拓扑后如何运行其他代码?例如,我想打印在第二个螺栓中完成的过滤结果,保存在 HashMap 中。 如果我将我的代码放在包含 submitTopology() 方法的行之后,代码是 运行 在元组完成之前。

第二个也是最后一个问题是:为什么在 Storm 的每个示例中,我都在 Spout 中看到

"Thread.sleep(1000)"?

可能与我的第一个问题有关。

我希望我的问题很清楚。 提前致谢!

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

这些只是 INFO 条消息。所以不用担心他们。

If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

如果您提交拓扑,拓扑将在后台执行(即多线程)。这是必需的,因为您的拓扑运行 "forever"(直到您明确停止它——或者您的 Java 应用程序终止,因为您处于 运行 本地模式)。

运行 代码 "after your topology finished" 与 Storm 概念不一致,因为 Strom 是一个流式系统并且存在 "no end in processing"(无限输入流,因此处理永远运行)。如果你想处理一个有限的数据集,你可能需要考虑像 Flink 或 Spark 这样的批处理框架。

因此,如果您想在 Storm 中进行这项工作,您需要能够确定所有数据的处理时间。因此,在提交拓扑后,您会在所有数据处理完毕后显式阻塞并等待。

但是,对于您的用例,为什么不只打印最后一个螺栓中的结果?

关于Thread.sleep()我不确定你指的是什么例子。不知道为什么有人要把它投入生产。也许它是为了演示目的而人为地减慢处理速度。