将数据从 Apache Storm 插入到 Azure Cosmos DB

Insert data from Apache Storm to Azure Cosmos DB

我正在尝试将风暴中的数据插入 cosmos db - Mongo db

    MongoClient mongoClient = null;

    mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));

   // Get database
   MongoDatabase database = mongoClient.getDatabase("toystore");

   // Get collection
   MongoCollection<Document> collection = database.getCollection("order");
   this.productid = tuple.getIntegerByField("productid");
   this.quantity = tuple.getIntegerByField("quantity");
   this.sales = tuple.getIntegerByField("sales");
   this.refund = tuple.getIntegerByField("refund");
   this.orderdate = tuple.getStringByField("orderdate");

   // Insert documents
   Document document = new Document();
   document.append("productid", this.productid);
   document.append("quantity", this.quantity);
   document.append("sales", this.sales);
   document.append("refund", this.refund);
   document.append("orderdate", this.orderdate);

   collection.insertOne(document);

数据应该被插入到 Cosmos db 中。我可以使用相同的代码从 storm 以外的单独 JAVA 程序插入 cosmos db。

2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
java.lang.ExceptionInInitializerError: null
    at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
    at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
    at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
    at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
    at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
    at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
    at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
    at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
    at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
    at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
    at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
    at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
    at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
    at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
    at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
    at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
    at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
    ... 23 more

它连接到 cosmos db,但是连接随后断开。

谢谢, 艾哈迈德

Microsoft 的 Larry 在 MSDN 论坛上回答了这个问题 这是答案的 link https://social.msdn.microsoft.com/Forums/azure/en-US/1eb4f5af-a4b7-4bab-8e3d-9dfaa736e7bd/insert-data-from-hdinsight-storm-to-azure-cosmos-db?forum=hdinsight

代码如下:https://github.com/Blackmist/hdinsight-java-storm-mongodb