InvalidTopologyException(msg:Component: [x] 从不存在的流 [y] 订阅

InvalidTopologyException(msg:Component: [x] subscribes from non-existent stream [y]

我正在尝试从 kafka 读取数据并使用 storm 插入到 cassandra 中。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。

这是我提交的作品。

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt"); 

在这里,如果我注释最后一行,我没有看到任何异常。在最后一行,我收到以下错误:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

谁能帮帮我,这里有什么问题吗?

这里是CheckingBolt中的outputFieldDeclarer

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。

TIA

这里的问题是您混淆了流名称和组件(即 spout/bolt)名称。组件名称用于指代不同的螺栓,而流名称用于指代来自同一螺栓的不同流。例如,如果您有一个名为 "evenOrOddBolt" 的螺栓,它可能会发出两个流,一个 "even" 流和 "odd" 流。但在许多情况下,一个 bolt 中只有一个流,这就是为什么 Storm 有一些使用默认流名称的便捷方法。

当您执行 .shuffleGrouping("checkingbolt") 时,您正在使用这些便捷方法之一,有效地说 "I want this bolt to consume the default stream coming out of the checkingbolt"。如果您想显式命名流,可以使用此方法的重载版本,但它仅在您有多个流来自同一个螺栓时才有用。

当您执行 ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"})); 时,您是说螺栓将在名为 "cassandraBoltStream" 的流上发射。这可能不是您想要做的,您想要声明它将在默认流上发出。您可以使用 ofd.declare 方法来执行此操作。

参考the documentation了解更多详情。