InvalidTopologyException(msg:Component: [x] 从不存在的流 [y] 订阅
InvalidTopologyException(msg:Component: [x] subscribes from non-existent stream [y]
我正在尝试从 kafka 读取数据并使用 storm 插入到 cassandra 中。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。
这是我提交的作品。
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt");
在这里,如果我注释最后一行,我没有看到任何异常。在最后一行,我收到以下错误:
InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])
谁能帮帮我,这里有什么问题吗?
这里是CheckingBolt中的outputFieldDeclarer
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}
我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。
TIA
这里的问题是您混淆了流名称和组件(即 spout/bolt)名称。组件名称用于指代不同的螺栓,而流名称用于指代来自同一螺栓的不同流。例如,如果您有一个名为 "evenOrOddBolt" 的螺栓,它可能会发出两个流,一个 "even" 流和 "odd" 流。但在许多情况下,一个 bolt 中只有一个流,这就是为什么 Storm 有一些使用默认流名称的便捷方法。
当您执行 .shuffleGrouping("checkingbolt")
时,您正在使用这些便捷方法之一,有效地说 "I want this bolt to consume the default stream coming out of the checkingbolt
"。如果您想显式命名流,可以使用此方法的重载版本,但它仅在您有多个流来自同一个螺栓时才有用。
当您执行 ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
时,您是说螺栓将在名为 "cassandraBoltStream" 的流上发射。这可能不是您想要做的,您想要声明它将在默认流上发出。您可以使用 ofd.declare
方法来执行此操作。
参考the documentation了解更多详情。
我正在尝试从 kafka 读取数据并使用 storm 插入到 cassandra 中。我也配置了拓扑,但是我遇到了一些问题,我不知道为什么会这样。
这是我提交的作品。
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt");
在这里,如果我注释最后一行,我没有看到任何异常。在最后一行,我收到以下错误:
InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])
谁能帮帮我,这里有什么问题吗?
这里是CheckingBolt中的outputFieldDeclarer
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}
我在 CassandraInsertBolt 的 declareOutputFields 方法中没有任何内容,因为该螺栓不会发出任何值。
TIA
这里的问题是您混淆了流名称和组件(即 spout/bolt)名称。组件名称用于指代不同的螺栓,而流名称用于指代来自同一螺栓的不同流。例如,如果您有一个名为 "evenOrOddBolt" 的螺栓,它可能会发出两个流,一个 "even" 流和 "odd" 流。但在许多情况下,一个 bolt 中只有一个流,这就是为什么 Storm 有一些使用默认流名称的便捷方法。
当您执行 .shuffleGrouping("checkingbolt")
时,您正在使用这些便捷方法之一,有效地说 "I want this bolt to consume the default stream coming out of the checkingbolt
"。如果您想显式命名流,可以使用此方法的重载版本,但它仅在您有多个流来自同一个螺栓时才有用。
当您执行 ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
时,您是说螺栓将在名为 "cassandraBoltStream" 的流上发射。这可能不是您想要做的,您想要声明它将在默认流上发出。您可以使用 ofd.declare
方法来执行此操作。
参考the documentation了解更多详情。