在 Apache Storm 拓扑开始时仅执行一次方法
Execute a method only once at start of an Apache Storm topology
如果我有一个简单的 Apache Storm 拓扑结构,在两个单独的节点上有一个喷口(设置为 2 的并行度)运行。在任何元组处理开始之前,如何在拓扑开始时编写一个 运行 一次且仅一次的方法?
singleton/staticclass 的任何实现或单独的同步方法都不起作用,因为这两个实例 运行 在不同的节点上运行。
也许我可以使用一些 Storm 方法来决定我是否是第一个被实例化的 Spout,然后才 运行?我尝试使用 getThisTaskId() 和 getThisWorkerTasks() 方法,但没有成功。
注意:2 的并行性是为了让事情简单化。一个解决方案应该适用于任意数量的 nodes/workers.
编辑:想到一个更简单的解决方案。如果有帮助,我会在下面留下原始答案。
您可以使用 TopologyContext.getThisTaskIndex
来执行此操作。如果你的 spout open
方法 运行 只有 TopologyContext.getThisTaskIndex == 0
的代码,那么你的代码只会 运行 一次,在任何元组被发出之前。
如果 运行 这段代码的 worker 崩溃了,当任务索引为 0 的 spout 实例重启时,代码将再次 运行。为了解决这个问题,您可以使用 Zookeeper 来存储应该在重启后继续存在的状态,例如一旦一次性代码具有 运行,就在 Zookeeper 中放置一个标志,并让 spout open
检查在 运行 代码之前未设置标志。
您可以使用 TopologyContext.getStormId
获取一个常量唯一字符串来标识拓扑,这样您就可以判断该标志是由该拓扑还是先前的部署设置的。
原回答:
运行 某些代码仅在部署拓扑时执行一次的最简单方法是在提交拓扑时调用代码。您可以在使用 TopologyBuilder
连接拓扑的同时调用一次性代码。这只会得到 运行 一次。缺点是它将 运行 在你调用 storm jar
的机器上。
如果您出于某种原因不能这样做或需要 运行 来自其中一个工作节点的代码,则 Storm 中没有任何内置的东西可以让您这样做。没有这种机制的原因是它需要工作 JVM 之间的额外协调,我认为没有人需要这样的东西。
您最好的选择可能是查看 Zookeeper/Curator 来进行协调(参见 https://curator.apache.org/curator-recipes/index.html)。这应该允许您在集群 运行 中仅使一个工作人员成为您的代码。您必须考虑如果工作人员选择 运行 您的代码 crashes/stalls.
会发生什么
Storm 已经使用 Zookeeper 进行协调,因此您只需连接到该集群即可。
如果我有一个简单的 Apache Storm 拓扑结构,在两个单独的节点上有一个喷口(设置为 2 的并行度)运行。在任何元组处理开始之前,如何在拓扑开始时编写一个 运行 一次且仅一次的方法?
singleton/staticclass 的任何实现或单独的同步方法都不起作用,因为这两个实例 运行 在不同的节点上运行。
也许我可以使用一些 Storm 方法来决定我是否是第一个被实例化的 Spout,然后才 运行?我尝试使用 getThisTaskId() 和 getThisWorkerTasks() 方法,但没有成功。
注意:2 的并行性是为了让事情简单化。一个解决方案应该适用于任意数量的 nodes/workers.
编辑:想到一个更简单的解决方案。如果有帮助,我会在下面留下原始答案。
您可以使用 TopologyContext.getThisTaskIndex
来执行此操作。如果你的 spout open
方法 运行 只有 TopologyContext.getThisTaskIndex == 0
的代码,那么你的代码只会 运行 一次,在任何元组被发出之前。
如果 运行 这段代码的 worker 崩溃了,当任务索引为 0 的 spout 实例重启时,代码将再次 运行。为了解决这个问题,您可以使用 Zookeeper 来存储应该在重启后继续存在的状态,例如一旦一次性代码具有 运行,就在 Zookeeper 中放置一个标志,并让 spout open
检查在 运行 代码之前未设置标志。
您可以使用 TopologyContext.getStormId
获取一个常量唯一字符串来标识拓扑,这样您就可以判断该标志是由该拓扑还是先前的部署设置的。
原回答:
运行 某些代码仅在部署拓扑时执行一次的最简单方法是在提交拓扑时调用代码。您可以在使用 TopologyBuilder
连接拓扑的同时调用一次性代码。这只会得到 运行 一次。缺点是它将 运行 在你调用 storm jar
的机器上。
如果您出于某种原因不能这样做或需要 运行 来自其中一个工作节点的代码,则 Storm 中没有任何内置的东西可以让您这样做。没有这种机制的原因是它需要工作 JVM 之间的额外协调,我认为没有人需要这样的东西。
您最好的选择可能是查看 Zookeeper/Curator 来进行协调(参见 https://curator.apache.org/curator-recipes/index.html)。这应该允许您在集群 运行 中仅使一个工作人员成为您的代码。您必须考虑如果工作人员选择 运行 您的代码 crashes/stalls.
会发生什么Storm 已经使用 Zookeeper 进行协调,因此您只需连接到该集群即可。