将 storm 的字数统计拓扑与 kafka 集成
integrating word count topology of storm with kafka
我正在尝试将 storm 的字数统计程序与 kafka 集成,因为我的生产者工作正常,即它正在读取文本文件并将每一行作为消息发送,我可以在简单的消费者控制台中看到这些消息.
现在为了将它与 storm 集成,即将那些 messages/lines 发送到消费者 spout,我刚刚用来自 storm-spout 集成依赖项的 kafka spout 替换了之前的字数统计程序的 storm spout,程序的其余部分是相同的,我是试图在 eclipse 中 运行 它但它没有被执行,我不知道是什么问题,甚至不知道我是否以正确的方式这样做,这是我的主要 class -
package com.spnotes.storm;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import com.spnotes.storm.bolts.WordCounterBolt;
import com.spnotes.storm.bolts.WordSpitterBolt;
public class WordCount {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
BrokerHosts hosts = new ZkHosts("localhost:9092");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "localhost:2181", "id1");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", kafkaSpout);
builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster = new LocalCluster();
System.out.println("submit topology");
Thread.sleep(10000);
//StormSubmitter.submitTopology("HelloStorm5", config, builder.createTopology());
cluster.submitTopology("HelloStorm5", config, builder.createTopology());
cluster.shutdown();
}
}
有 2 个螺栓 WordSplitterBolt() 和 WordCounterBolt() ,Wordsplitterbolt 将每个 line/message 分解为 tokens/words 并且 WordCounterBolt 正在计算每个单词。谁能告诉我我做错了什么?我需要创建自己的喷口而不是使用预定义的 KafkaSpout 吗?我的主要 class 正确吗?
更改代码:
BrokerHosts hosts = new ZkHosts(zkConnect);
zkConnect 是 zookeeper 主机名和端口,不适用于 kafka。将其更改为 localhost:2181
正如在聊天中讨论的与代码相关的休息问题。
问题出在 Maven dependency.include 中 POM.xml 需要所有依赖项。
我正在尝试将 storm 的字数统计程序与 kafka 集成,因为我的生产者工作正常,即它正在读取文本文件并将每一行作为消息发送,我可以在简单的消费者控制台中看到这些消息. 现在为了将它与 storm 集成,即将那些 messages/lines 发送到消费者 spout,我刚刚用来自 storm-spout 集成依赖项的 kafka spout 替换了之前的字数统计程序的 storm spout,程序的其余部分是相同的,我是试图在 eclipse 中 运行 它但它没有被执行,我不知道是什么问题,甚至不知道我是否以正确的方式这样做,这是我的主要 class -
package com.spnotes.storm;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import com.spnotes.storm.bolts.WordCounterBolt;
import com.spnotes.storm.bolts.WordSpitterBolt;
public class WordCount {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
BrokerHosts hosts = new ZkHosts("localhost:9092");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "localhost:2181", "id1");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", kafkaSpout);
builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster = new LocalCluster();
System.out.println("submit topology");
Thread.sleep(10000);
//StormSubmitter.submitTopology("HelloStorm5", config, builder.createTopology());
cluster.submitTopology("HelloStorm5", config, builder.createTopology());
cluster.shutdown();
}
}
有 2 个螺栓 WordSplitterBolt() 和 WordCounterBolt() ,Wordsplitterbolt 将每个 line/message 分解为 tokens/words 并且 WordCounterBolt 正在计算每个单词。谁能告诉我我做错了什么?我需要创建自己的喷口而不是使用预定义的 KafkaSpout 吗?我的主要 class 正确吗?
更改代码:
BrokerHosts hosts = new ZkHosts(zkConnect);
zkConnect 是 zookeeper 主机名和端口,不适用于 kafka。将其更改为 localhost:2181
正如在聊天中讨论的与代码相关的休息问题。
问题出在 Maven dependency.include 中 POM.xml 需要所有依赖项。