将数据从 Apache Storm 插入到 Azure Cosmos DB
Insert data from Apache Storm to Azure Cosmos DB
我正在尝试将风暴中的数据插入 cosmos db - Mongo db
MongoClient mongoClient = null;
mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));
// Get database
MongoDatabase database = mongoClient.getDatabase("toystore");
// Get collection
MongoCollection<Document> collection = database.getCollection("order");
this.productid = tuple.getIntegerByField("productid");
this.quantity = tuple.getIntegerByField("quantity");
this.sales = tuple.getIntegerByField("sales");
this.refund = tuple.getIntegerByField("refund");
this.orderdate = tuple.getStringByField("orderdate");
// Insert documents
Document document = new Document();
document.append("productid", this.productid);
document.append("quantity", this.quantity);
document.append("sales", this.sales);
document.append("refund", this.refund);
document.append("orderdate", this.orderdate);
collection.insertOne(document);
数据应该被插入到 Cosmos db 中。我可以使用相同的代码从 storm 以外的单独 JAVA 程序插入 cosmos db。
2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
java.lang.ExceptionInInitializerError: null
at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
... 23 more
它连接到 cosmos db,但是连接随后断开。
谢谢,
艾哈迈德
我正在尝试将风暴中的数据插入 cosmos db - Mongo db
MongoClient mongoClient = null;
mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));
// Get database
MongoDatabase database = mongoClient.getDatabase("toystore");
// Get collection
MongoCollection<Document> collection = database.getCollection("order");
this.productid = tuple.getIntegerByField("productid");
this.quantity = tuple.getIntegerByField("quantity");
this.sales = tuple.getIntegerByField("sales");
this.refund = tuple.getIntegerByField("refund");
this.orderdate = tuple.getStringByField("orderdate");
// Insert documents
Document document = new Document();
document.append("productid", this.productid);
document.append("quantity", this.quantity);
document.append("sales", this.sales);
document.append("refund", this.refund);
document.append("orderdate", this.orderdate);
collection.insertOne(document);
数据应该被插入到 Cosmos db 中。我可以使用相同的代码从 storm 以外的单独 JAVA 程序插入 cosmos db。
2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
java.lang.ExceptionInInitializerError: null
at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
... 23 more
它连接到 cosmos db,但是连接随后断开。
谢谢, 艾哈迈德