将数据从一个螺栓发送到另一个 Apache Storm
Send data from one bolt to another Apache Storm
嗨,与 Apache Storm
一起工作。我有多个 kafka 主题,我想使用单个螺栓解析所有消息(使用并行处理负载)。
我想问一下可以吗?以下是我正在尝试的
Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument
TopologyBuilder topology = new TopologyBuilder();
spouts.forEach(spec -> {
topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
});
我的 SpoutSpec Class
public class SpoutSpec<T extends BaseRichBolt> {
private final String name;
private final int parallelism;
private final SpoutConfig spoutConfig;
private final T handler;
}
但是消息没有从 FileBeat-Bolt
发送到其他螺栓。以下是我发送数据的方式:
JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);
String topic = jsonNode.get("@metadata").get("topic").getTextValue();
String message = jsonNode.get("message").getTextValue();
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
你的emit
调用有误。第一个参数不是螺栓名称,而是流名称。流名称用于您希望将来自螺栓的消息划分为多个数据流的情况。在您的情况下,您不想拆分流。
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
将发送到一个名为 "Message-Handling-Bolt" 的流,而您在该流上没有任何监听。您的 "Message-Handling-Bolt" 正在收听默认流。要么将第一个参数设为 emit
,要么将螺栓声明更改为:
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");
编辑:回复您的评论:
最简单的解决方案是简单地在 emit 调用中删除第一个参数:
collector.emit(input, new Values(topic, message));
如果出于某种原因您不想这样做,并且想要明确命名流,您需要声明您的 FileBeatMessageBolt
将发送到 Message-Handling-Bolt
流。您将此作为 declareOutputFields
实施的一部分:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}
嗨,与 Apache Storm
一起工作。我有多个 kafka 主题,我想使用单个螺栓解析所有消息(使用并行处理负载)。
我想问一下可以吗?以下是我正在尝试的
Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument
TopologyBuilder topology = new TopologyBuilder();
spouts.forEach(spec -> {
topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
});
我的 SpoutSpec Class
public class SpoutSpec<T extends BaseRichBolt> {
private final String name;
private final int parallelism;
private final SpoutConfig spoutConfig;
private final T handler;
}
但是消息没有从 FileBeat-Bolt
发送到其他螺栓。以下是我发送数据的方式:
JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);
String topic = jsonNode.get("@metadata").get("topic").getTextValue();
String message = jsonNode.get("message").getTextValue();
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
你的emit
调用有误。第一个参数不是螺栓名称,而是流名称。流名称用于您希望将来自螺栓的消息划分为多个数据流的情况。在您的情况下,您不想拆分流。
collector.emit("Message-Handling-Bolt", input, new Values(topic, message));
将发送到一个名为 "Message-Handling-Bolt" 的流,而您在该流上没有任何监听。您的 "Message-Handling-Bolt" 正在收听默认流。要么将第一个参数设为 emit
,要么将螺栓声明更改为:
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");
编辑:回复您的评论: 最简单的解决方案是简单地在 emit 调用中删除第一个参数:
collector.emit(input, new Values(topic, message));
如果出于某种原因您不想这样做,并且想要明确命名流,您需要声明您的 FileBeatMessageBolt
将发送到 Message-Handling-Bolt
流。您将此作为 declareOutputFields
实施的一部分:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}