如何在终止风暴拓扑之前调用特定方法
How to call a particular method before killing a storm topology
如何在终止风暴拓扑之前调用特定方法。
我在 storm 中创建了一个拓扑,我想在拓扑被杀死之前调用特定的方法。
在 Storm 框架中是否有任何预定义的覆盖或任何方法可用于执行此操作。
提前致谢:)
没有这样的东西...
作为一种解决方法,您可以在终止拓扑之前 deactivate
拓扑。这确保 Spout.deactivate()
被调用。
如果你需要在bolts调用一个方法,使用Spout.deactivate()
发送一个"notification tuple"(不包含要处理的数据)遍历整个拓扑。在每个螺栓中,如果收到 "notification tuple",则调用您的特殊方法。
此外,这个 "notification tuple" 必须由螺栓转发给它的所有前任。您需要确保 "notification tuples" 被发送到每个螺栓的所有并行执行器。为此,使用专用的 "notification stream" 并通过 allGrouping()
将每个螺栓订阅到此流(除了常规输入流之外)。在每个螺栓中,您需要检查元组是否为通知元组(例如通过 Tuple.getSourceStreamId()
)
清理完成后,最后可以kill拓扑。
如何在终止风暴拓扑之前调用特定方法。
我在 storm 中创建了一个拓扑,我想在拓扑被杀死之前调用特定的方法。
在 Storm 框架中是否有任何预定义的覆盖或任何方法可用于执行此操作。
提前致谢:)
没有这样的东西...
作为一种解决方法,您可以在终止拓扑之前 deactivate
拓扑。这确保 Spout.deactivate()
被调用。
如果你需要在bolts调用一个方法,使用Spout.deactivate()
发送一个"notification tuple"(不包含要处理的数据)遍历整个拓扑。在每个螺栓中,如果收到 "notification tuple",则调用您的特殊方法。
此外,这个 "notification tuple" 必须由螺栓转发给它的所有前任。您需要确保 "notification tuples" 被发送到每个螺栓的所有并行执行器。为此,使用专用的 "notification stream" 并通过 allGrouping()
将每个螺栓订阅到此流(除了常规输入流之外)。在每个螺栓中,您需要检查元组是否为通知元组(例如通过 Tuple.getSourceStreamId()
)
清理完成后,最后可以kill拓扑。