如何在给螺栓时间完成的同时关闭 Apache Storm 中的喷口?

How to shutdown spout in Apache Storm while giving bolts time to finish?

我有一个 Storm 拓扑,它从远程服务器读取数据,然后处理其螺栓中的文本。

我想 deactivate/shutdown 当 spout 检测到没有剩余数据可供读取时,让 bolts 有时间完成它们的处理。

我尝试在 spout 中使用 deactivate() 方法,方法是在检测到没有数据可供读取时调用它。我不知道如何让它工作(找不到太多关于它的文档)。

当我检测到没有数据可供读取时,我决定继续调用我在 spout 中编写的方法。我在关闭拓扑时使用了 KillOptions 来指定一些等待时间,但它会在不等待的情况下调用该方法后立即继续杀死它。

private void endReading() {
    Map conf = Utils.readStormConfig();
    conf.put("nimbus.seeds", "localhost");
    NimbusClient cc = NimbusClient.getConfiguredClient(conf);
    Nimbus.Client client = cc.getClient();
    try {
         KillOptions ko = new KillOptions();
         ko.set_wait_secs(600);
         ko.set_wait_secs_isSet(true);
         client.killTopologyWithOpts("local-topology", ko); 
    } catch (TException e) {
        e.printStackTrace();
    }
}

我是否错误地使用了 KillOptions?有没有更简单的方法来关闭喷口,同时留出时间让螺栓完成?如有任何建议,我们将不胜感激。

编辑:我在本地 运行,这就是它无法正常工作的原因,如答案的后续行动中所述。

你做的是一个很好的解决方案。当您 运行 时,您发布的代码将关闭整个拓扑。请记住,它将关闭调用它的 spout 以及同一拓扑中的任何其他 spout。