错误 backtype.storm.util - 异步循环终止! java.lang.RuntimeException: java.lang.NullPointerException

ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException

我正在尝试将 Storm 字数统计程序的输出写入 mongoDB.This 是我在执行 program.Though 时遇到的错误 我能够成功打印 reusult.Problem 在我尝试写入输出时启动。

产生的错误:

Jun 01, 2015 3:24:23 PM com.mongodb.DBTCPConnector fetchMaxBsonObjectSize
WARNING: Exception determining maxBSON size using0
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017] bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.findOne(DBPort.java:129)
at com.mongodb.DBPort.runCommand(DBPort.java:138)
at com.mongodb.DBTCPConnector.fetchMaxBsonObjectSize(DBTCPConnector.java:419)
at com.mongodb.Mongo.getMaxBsonObjectSize(Mongo.java:541)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:237)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

Jun 01, 2015 3:24:23 PM com.mongodb.DBTCPConnector fetchMaxBsonObjectSize
WARNING: Exception determining maxBSON size using0
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017]                                                                bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.findOne(DBPort.java:129)
at com.mongodb.DBPort.runCommand(DBPort.java:138)
at com.mongodb.DBTCPConnector.fetchMaxBsonObjectSize(DBTCPConnector.java:419)
at com.mongodb.DBTCPConnector.checkMaster(DBTCPConnector.java:406)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:144)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

Jun 01, 2015 3:24:23 PM com.mongodb.DBPortPool gotError
WARNING: emptying DBPortPool to localhost:27017 b/c of error
java.io.IOException: couldn't connect to [Ritesh-RM/127.0.1.1:27017]                                                                bc:java.net.ConnectException: Connection refused
at com.mongodb.DBPort._open(DBPort.java:206)
at com.mongodb.DBPort.go(DBPort.java:94)
at com.mongodb.DBPort.go(DBPort.java:75)
at com.mongodb.DBPort.say(DBPort.java:70)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:151)
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210)
at com.mongodb.DBCollection.insert(DBCollection.java:80)
at bolts.WordCounter.execute(WordCounter.java:88)
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

8037 [Thread-9-word-counter] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at                                                                                  backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.NullPointerException: null
at com.mongodb.DBTCPConnector._error(DBTCPConnector.java:287) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:161) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210) ~[com.mongodb.jar:na]
at com.mongodb.DBCollection.insert(DBCollection.java:80) ~[com.mongodb.jar:na]
at bolts.WordCounter.execute(WordCounter.java:88) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
8038 [Thread-9-word-counter] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.NullPointerException
at                                                                                 backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.NullPointerException: null
at com.mongodb.DBTCPConnector._error(DBTCPConnector.java:287) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:161) ~[com.mongodb.jar:na]
at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:137) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:255) ~[com.mongodb.jar:na]
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:210) ~[com.mongodb.jar:na]
at com.mongodb.DBCollection.insert(DBCollection.java:80) ~[com.mongodb.jar:na]
at bolts.WordCounter.execute(WordCounter.java:88) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
8137 [Thread-9-word-counter] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]

****WordCounter 螺栓****

public class WordCounter extends BaseBasicBolt {

Integer id;
String name;
Map<String, Integer> counters;

/**
 * At the end of the spout (when the cluster is shutdown
 * We will show the word counters
 */
@Override
public void cleanup() {
    /*try{
        MongoClient client = new MongoClient(new ServerAddress("localhost", 27017));
        DB database = client.getDB("storm");
        final DBCollection collection = database.getCollection("query");

        BasicDBObject dbObject = new BasicDBObject();
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
        dbObject.put("searchItem",entry.getKey());
        dbObject.put("count",entry.getValue());
        collection.insert(dbObject);
        }
        client.close();

    }catch (UnknownHostException e) {
        e.printStackTrace();
    }*/
}

/**
 * On create 
 */
@Override
public void prepare(Map stormConf, TopologyContext context) {
    this.counters = new HashMap<String, Integer>();
    this.name = context.getThisComponentId();
    this.id = context.getThisTaskId();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}


@Override
public void execute(Tuple input, BasicOutputCollector collector) {
    String str = input.getString(0);
    /**
     * If the word dosn't exist in the map we will create
     * this, if not We will add 1 
     */
    try{
    Mongo mongo = new Mongo("localhost", 27017);
    DB db = mongo.getDB("query");

    final DBCollection table = db.getCollection("query");



    BasicDBObject dbObject = new BasicDBObject();


    if(!counters.containsKey(str)){
        counters.put(str, 1);
        dbObject.put("searchItem",str);
        dbObject.put("count",1);
        table.insert(dbObject);
    }else{
        Integer c = counters.get(str) + 1;
        counters.put(str, c);
        BasicDBObject query = new BasicDBObject();
        query.put("searchItem", str);
        BasicDBObject documentDetail = new BasicDBObject();

        documentDetail.put("count", c);
        BasicDBObject updateObj = new BasicDBObject();
        updateObj.put("$set", documentDetail);

        table.update(query, updateObj);
    }
     }catch (UnknownHostException e) {
            e.printStackTrace();
        }
     System.out.println("In Counter : ");
}

}

由于 mongoDB 版本,我遇到了这个错误。 我在我的 pom.xml 中更改了它并且一切正常。

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.0.1</version>
</dependency>