Вставка данных из Apache Storm в Azure Cosmos DB
Пытаюсь вставить данные из шторма в космос дБ - Монго дб
MongoClient mongoClient = null;
mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));
// Get database
MongoDatabase database = mongoClient.getDatabase("toystore");
// Get collection
MongoCollection<Document> collection = database.getCollection("order");
this.productid = tuple.getIntegerByField("productid");
this.quantity = tuple.getIntegerByField("quantity");
this.sales = tuple.getIntegerByField("sales");
this.refund = tuple.getIntegerByField("refund");
this.orderdate = tuple.getStringByField("orderdate");
// Insert documents
Document document = new Document();
document.append("productid", this.productid);
document.append("quantity", this.quantity);
document.append("sales", this.sales);
document.append("refund", this.refund);
document.append("orderdate", this.orderdate);
collection.insertOne(document);
Данные должны быть вставлены в Cosmos db. Я могу использовать тот же код для вставки в космос БД из отдельной программы JAVA, кроме шторма.
2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
java.lang.ExceptionInInitializerError: null
at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
... 23 more
Он подключается к космосу дБ, однако затем связь умирает.
Спасибо ахмад
1 ответ
Решение
На это ответил Ларри из Microsoft на форуме MSDN. Вот ссылка на ответ https://social.msdn.microsoft.com/Forums/azure/en-US/1eb4f5af-a4b7-4bab-8e3d-9dfaa736e7bd/insert-data-from-hdinsight-storm-to-azure-cosmos-db?forum=hdinsight
Вот код: https://github.com/Blackmist/hdinsight-java-storm-mongodb