如何在给螺栓时间完成的同时关闭 Apache Storm 中的喷口?
How to shutdown spout in Apache Storm while giving bolts time to finish?
我有一个 Storm 拓扑,它从远程服务器读取数据,然后处理其螺栓中的文本。
我想 deactivate/shutdown 当 spout 检测到没有剩余数据可供读取时,让 bolts 有时间完成它们的处理。
我尝试在 spout 中使用 deactivate() 方法,方法是在检测到没有数据可供读取时调用它。我不知道如何让它工作(找不到太多关于它的文档)。
当我检测到没有数据可供读取时,我决定继续调用我在 spout 中编写的方法。我在关闭拓扑时使用了 KillOptions 来指定一些等待时间,但它会在不等待的情况下调用该方法后立即继续杀死它。
private void endReading() {
Map conf = Utils.readStormConfig();
conf.put("nimbus.seeds", "localhost");
NimbusClient cc = NimbusClient.getConfiguredClient(conf);
Nimbus.Client client = cc.getClient();
try {
KillOptions ko = new KillOptions();
ko.set_wait_secs(600);
ko.set_wait_secs_isSet(true);
client.killTopologyWithOpts("local-topology", ko);
} catch (TException e) {
e.printStackTrace();
}
}
我是否错误地使用了 KillOptions?有没有更简单的方法来关闭喷口,同时留出时间让螺栓完成?如有任何建议,我们将不胜感激。
编辑:我在本地 运行,这就是它无法正常工作的原因,如答案的后续行动中所述。
你做的是一个很好的解决方案。当您 运行 时,您发布的代码将关闭整个拓扑。请记住,它将关闭调用它的 spout 以及同一拓扑中的任何其他 spout。
我有一个 Storm 拓扑,它从远程服务器读取数据,然后处理其螺栓中的文本。
我想 deactivate/shutdown 当 spout 检测到没有剩余数据可供读取时,让 bolts 有时间完成它们的处理。
我尝试在 spout 中使用 deactivate() 方法,方法是在检测到没有数据可供读取时调用它。我不知道如何让它工作(找不到太多关于它的文档)。
当我检测到没有数据可供读取时,我决定继续调用我在 spout 中编写的方法。我在关闭拓扑时使用了 KillOptions 来指定一些等待时间,但它会在不等待的情况下调用该方法后立即继续杀死它。
private void endReading() {
Map conf = Utils.readStormConfig();
conf.put("nimbus.seeds", "localhost");
NimbusClient cc = NimbusClient.getConfiguredClient(conf);
Nimbus.Client client = cc.getClient();
try {
KillOptions ko = new KillOptions();
ko.set_wait_secs(600);
ko.set_wait_secs_isSet(true);
client.killTopologyWithOpts("local-topology", ko);
} catch (TException e) {
e.printStackTrace();
}
}
我是否错误地使用了 KillOptions?有没有更简单的方法来关闭喷口,同时留出时间让螺栓完成?如有任何建议,我们将不胜感激。
编辑:我在本地 运行,这就是它无法正常工作的原因,如答案的后续行动中所述。
你做的是一个很好的解决方案。当您 运行 时,您发布的代码将关闭整个拓扑。请记住,它将关闭调用它的 spout 以及同一拓扑中的任何其他 spout。