向我的风暴集群提交字数统计拓扑,使用 Eclipse 创建 jar,但它显示异常

Submitting a word count topology to my storm cluster, created jar using Eclipse but it is showing Exception

我正在尝试向我的风暴集群提交一个字数统计拓扑。我使用 Eclipse 创建了一个 jar,但它显示异常。

谁能告诉我该怎么做。我在这里附上我的代码和异常。

Spout 创作-

public class WordReader implements IRichSpout {
    private SpoutOutputCollector collecter;
    private BufferedReader bufferedreader;
    private FileReader filereader;
    private Boolean completed=false;
    private TopologyContext context;
    private final static Logger logger=LoggerFactory.getLogger(WordReader.class);
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("Ok"+msgId);
    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub
        logger.info("Activating Storm");        
    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("Fail"+msgId);   
    }
    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        if(completed)
        {
            try
            {
                Thread.sleep(1000);
            }
            catch(InterruptedException e)
            {
                System.out.println("String is Interrupted");
            }
        }
        String line;
        bufferedreader=new BufferedReader(filereader);
        try {
            while((line=bufferedreader.readLine())!= null)
            {
                this.collecter.emit(new Values(line));
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            completed=true;
        }   
    }
    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.context=context;
        try {
            this.filereader=new FileReader(map.get("InputFile").toString());
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            throw new RuntimeException("Error reading file");
        }
            this.collecter=collector;}
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("line"));
    }
}

螺栓代码-

public class WordNormalizer implements IRichBolt{
    private OutputCollector collecter;
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String sentence=input.getStringByField("line");
        String[] words=sentence.split(" ");
        for(String word:words)
        {
            word=word.trim();
            if(!word.isEmpty())
            {
                word=word.toLowerCase();
                ArrayList a=new ArrayList();
                a.add(input);
                this.collecter.emit(a,new Values(word));
            }
            collecter.ack(input);
        }   
    }
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collecter=collecter;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub  
        declarer.declare(new Fields("word"));
    }
}

螺栓计数频率-

public class WordCountBolt implements IRichBolt {
    private OutputCollector collector;
    private Integer id;
    private String name;
    private Map<String, Integer> counter;
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
        for(Map.Entry<String, Integer> entry : counter.entrySet())
        {
            System.out.println(entry.getKey()+" "+entry.getValue());
        }

    }
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String str=input.getStringByField("word");
        if(!counter.containsKey(str))
        {
            counter.put(str, 1);
        }
        else
        {
            Integer i=counter.get(str)+1;
            counter.put(str, i);
        }
    }
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.counter=new HashMap<String, Integer>();
        this.collector=collector;
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

主要Class创建拓扑-

public class StormMain {
    public static void main(String[] args)
    {
        //Configuration
        Config conf = new Config();
        conf.put("InputFile",args[0]);
        conf.setDebug(false);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCountBolt()).shuffleGrouping("word-normalizer");
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TopologyMain",conf,builder.createTopology());
        //Thread.sleep(1000);
        //cluster.killTopology("TopologyMain");
        cluster.shutdown();
    }
}

编辑异常

这是我得到的异常:

     org.apache.storm.zookeeper.server.NIOServerCnxn - caught end of stream exception
org.apache.storm.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x14cb812ae720003, likely client has closed socket
    at org.apache.storm.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) ~[storm-core-0.9.3.jar:0.9.3]
    at org.apache.storm.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-0.9.3.jar:0.9.3]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
6056 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55127 which had sessionid 0x14cb812ae720003
6057 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720005
6076 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720005 closed
6076 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55133 which had sessionid 0x14cb812ae720005
6076 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6076 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor 3b4d74c2-9fa3-4b8d-beb8-419063c95c02
6077 [Thread-3] INFO  backtype.storm.event - Event manager interrupted
6077 [Thread-4] INFO  backtype.storm.event - Event manager interrupted
6078 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720007
6097 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55139 which had sessionid 0x14cb812ae720007
6097 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720007 closed
6097 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6098 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor e94ee8a8-f38f-4ba4-a48f-4427a7c8d30d
6098 [Thread-5] INFO  backtype.storm.event - Event manager interrupted
6098 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
6099 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720009
6117 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720009 closed
6117 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55145 which had sessionid 0x14cb812ae720009
6118 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6118 [main] INFO  backtype.storm.testing - Shutting down in process zookeeper
6118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
6119 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down
6119 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
6119 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
6119 [main] INFO  org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
6120 [main] INFO  backtype.storm.testing - Done shutting down in process zookeeper
6120 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp35008e-119b-4ae3-a557-2839d573a579
6128 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Tempc53a710-6448-441e-bf01-734b80f9b989
6130 [main] INFO  backtype.storm.testing - Unable to delete file: C:\Users\Rishi\AppData\Local\Tempc53a710-6448-441e-bf01-734b80f9b989\version-2\log.1
6130 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Tempea20791-599d-483d-9ffd-37445005684c
6136 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\ef85048c-77e3-4392-8fc1-41bb4547ab53
8027 [SessionTracker] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!

我们必须使用 StormSubmitter 来提交拓扑,而不是使用 LocalCluster.....我使用它并且它有效,现在我在 Storm 上有了我的拓扑 UI

cluster.submitTopology("TopologyMain",conf,builder.createTopology());

替换为--

StormSubmitter.submitTopology("TopologyMain",conf,builder.createTopology());

只需增加线程休眠的时间长度 例如

Thread.sleep(10000);

设为10000然后检查。 因为你的地图任务耗时太长了。

在 运行 程序上遇到了一个非常相似的问题,它卡在了 :

[SessionTracker] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!

通过移除

解决了它
cluster.shutdown();    

因为它不允许 zookeeper 与 storm 通信。