java.io.NotSerializableException: com.mongodb.MongoCollectionImpl 错误

java.io.NotSerializableException: com.mongodb.MongoCollectionImpl error

应用从kafka读取数据并写入mongoDB.

下面是我看到的错误行...

java.io.NotSerializableException: com.mongodb.MongoCollectionImpl

Main class 拓扑开始的地方。

ZkHosts zkHosts=new ZkHosts("localhost:2181");
String topic_name="test";
String consumer_group_id="storm";
String zookeeper_root="";
SpoutConfig kafkaConfig=new SpoutConfig(zkHosts,topic_name, zookeeper_root, consumer_group_id);
kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
//Mongo Setup
MongoClient mongoClient = new MongoClient( "zz.yyy.xx.abc",27017 );
mongoClient.setWriteConcern(WriteConcern.SAFE);
MongoDatabase db = mongoClient.getDatabase("IOT");
MongoCollection<Document> iotSampleColl = db.getCollection("iot_sample");
MongoInsertBolt  mongoInsertBolt  = new MongoInsertBolt(iotSampleColl);
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("KafkaSpout", kafkaSpout);
builder.setBolt("MongoInsertBolt", mongoInsertBolt).allGrouping("KafkaSpout");
Config conf = new Config();
LocalCluster cluster=new LocalCluster();
try{
    cluster.submitTopology("test", conf, builder.createTopology());
    cluster.shutdown();
}catch (Exception e) {
    System.out.println(e.getMessage());
}

class 写入 MongoDB, MongoInsertBolt.class:

private static final long serialVersionUID = 2504213456001787553L;
protected MongoCollection<Document> iotSampleColl;

public MongoInsertBolt(MongoCollection<Document> iotSampleColl) {
    this.iotSampleColl = iotSampleColl;
}

@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
    String word=tuple.getString(0);
    Document packet = new Document();
    packet.put("IOT_trans",word);
    if((null == word) || (word.length() == 0))
    {
        return;
    }
    iotSampleColl.insertOne(packet);
    System.out.println("Word is..."+word);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    // TODO Auto-generated method stub
}

如果我做错了请告诉我。提前谢谢大家。

只需将您的 MongoDB 连接代码移动到 prepare(...) 方法中:

public class MongoInsertBolt extends IBasicBolt {
    private static final long serialVersionUID = 2504213456001787553L;
    protected MongoCollection<Document> iotSampleColl;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        MongoClient mongoClient = new MongoClient( "zz.yyy.xx.abc",27017 );
        mongoClient.setWriteConcern(WriteConcern.SAFE);
        MongoDatabase db = mongoClient.getDatabase("IOT");
        iotSampleColl = db.getCollection("iot_sample");
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String word=tuple.getString(0);
        Document packet = new Document();
        packet.put("IOT_trans",word);
        if((null == word) || (word.length() == 0))
        {
            return;
        }
        iotSampleColl.insertOne(packet);
        System.out.println("Word is..."+word);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // TODO Auto-generated method stub
    }
}