如何检索新文档并将其添加到 MongoDb 集合中抛出 com.mongodb.reactivestreams.client.MongoClient
how retrieve and add new document to MongoDb collection throw com.mongodb.reactivestreams.client.MongoClient
上下文:我编写了一个接收简单消息的 Kafka 消费者,我想使用 com.mongodb.reactivestreams.client.MongoClient 将其插入到 MongoDb。虽然我明白我的问题是关于如何正确使用 MongoClient 让我通知我的堆栈:我的堆栈是 Micronaut + MongoDb reactive + Kotlin.
免责声明:如果有人在 java 中提供答案,我可以将其翻译成 Kotlin。您可以忽略下面的 Kafka 部分,因为它按预期工作。
这是我的代码
package com.mybank.consumer
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import org.bson.Document
import org.reactivestreams.Publisher
import javax.inject.Inject
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
//@Named("another")
var mongoClient: MongoClient? = null
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
println("Account - $name by $key")
var mongoDb : MongoDatabase? = mongoClient?.getDatabase("account")
var mongoCollection: MongoCollection<Document>? = mongoDb?.getCollection("account_collection")
var mongoDocument: Publisher<Document>? = mongoCollection?.find()?.first()
print(mongoDocument.toString())
//println(mongoClient?.getDatabase("account")?.getCollection("account_collection")?.find()?.first())
//val mongoClientClient: MongoDatabase = mongoClient.getDatabase("account")
//println(mongoClient.getDatabase("account").getCollection("account_collection").find({ "size.h": { $lt: 15 } })
//println(mongoClient.getDatabase("account").getCollection("account_collection").find("1").toString())
}
}
好吧,上面的代码是我得到的最接近的代码。它没有提示任何错误。正在打印
com.mongodb.reactivestreams.client.internal.Publishers$$Lambda8/0x0000000800525840@437ec11
我想这证明代码已正确连接到数据库,但我希望打印第一个文档。
共有三个文件:
我的最终目标是将从 Kafka Listener 收到的消息插入到 MongoDb。任何线索将不胜感激。
完整代码见git hub
*** 在 Susan 的问题后编辑
这是打印的
var mongoDocument = mongoCollection?.find()?.first()
print(mongoDocument.toString())
看起来您正在为 mongodb 使用反应流。您使用反应流有什么原因吗?
您得到的结果类型为“Publisher”。您将需要使用 subscribe() 方法来获取文档。
请参阅有关 Publisher 的文档。
http://www.howsoftworks.net/reacstre/1.0.2/Publisher
如果您不想使用反应式:how/what 上很好的例子,可以在 Kotlin 中用于 mongodb。
https://kb.objectrocket.com/mongo-db/retrieve-mongodb-document-using-kotlin-1180
--- 类似 StackOverlow,使用 MongoDB、Reactive Streams、Publisher。
how save document to MongoDb with com.mongodb.reactivestreams.client
=============== 已编辑 ==============
Publisher<Document> publisher = collection.find().first();
subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber); //publisher.subscribe(subscriber)
subscriber.await();
The example will print the following document:
{ "_id" : { "$oid" : "551582c558c7b4fbacf16735" },
"name" : "MongoDB", "type" : "database", "count" : 1,
}
如果你想要非阻塞,这样做:
publisher.subscribe(new PrintDocumentSubscriber()); //without await
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.6/getting-started/quick-tour/
上下文:我编写了一个接收简单消息的 Kafka 消费者,我想使用 com.mongodb.reactivestreams.client.MongoClient 将其插入到 MongoDb。虽然我明白我的问题是关于如何正确使用 MongoClient 让我通知我的堆栈:我的堆栈是 Micronaut + MongoDb reactive + Kotlin.
免责声明:如果有人在 java 中提供答案,我可以将其翻译成 Kotlin。您可以忽略下面的 Kafka 部分,因为它按预期工作。
这是我的代码
package com.mybank.consumer
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import org.bson.Document
import org.reactivestreams.Publisher
import javax.inject.Inject
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
//@Named("another")
var mongoClient: MongoClient? = null
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
println("Account - $name by $key")
var mongoDb : MongoDatabase? = mongoClient?.getDatabase("account")
var mongoCollection: MongoCollection<Document>? = mongoDb?.getCollection("account_collection")
var mongoDocument: Publisher<Document>? = mongoCollection?.find()?.first()
print(mongoDocument.toString())
//println(mongoClient?.getDatabase("account")?.getCollection("account_collection")?.find()?.first())
//val mongoClientClient: MongoDatabase = mongoClient.getDatabase("account")
//println(mongoClient.getDatabase("account").getCollection("account_collection").find({ "size.h": { $lt: 15 } })
//println(mongoClient.getDatabase("account").getCollection("account_collection").find("1").toString())
}
}
好吧,上面的代码是我得到的最接近的代码。它没有提示任何错误。正在打印
com.mongodb.reactivestreams.client.internal.Publishers$$Lambda8/0x0000000800525840@437ec11
我想这证明代码已正确连接到数据库,但我希望打印第一个文档。
共有三个文件:
我的最终目标是将从 Kafka Listener 收到的消息插入到 MongoDb。任何线索将不胜感激。
完整代码见git hub
*** 在 Susan 的问题后编辑
这是打印的
var mongoDocument = mongoCollection?.find()?.first()
print(mongoDocument.toString())
看起来您正在为 mongodb 使用反应流。您使用反应流有什么原因吗?
您得到的结果类型为“Publisher”。您将需要使用 subscribe() 方法来获取文档。
请参阅有关 Publisher 的文档。
http://www.howsoftworks.net/reacstre/1.0.2/Publisher
如果您不想使用反应式:how/what 上很好的例子,可以在 Kotlin 中用于 mongodb。
https://kb.objectrocket.com/mongo-db/retrieve-mongodb-document-using-kotlin-1180
--- 类似 StackOverlow,使用 MongoDB、Reactive Streams、Publisher。
how save document to MongoDb with com.mongodb.reactivestreams.client
=============== 已编辑 ==============
Publisher<Document> publisher = collection.find().first();
subscriber = new PrintDocumentSubscriber();
publisher.subscribe(subscriber); //publisher.subscribe(subscriber)
subscriber.await();
The example will print the following document:
{ "_id" : { "$oid" : "551582c558c7b4fbacf16735" },
"name" : "MongoDB", "type" : "database", "count" : 1,
}
如果你想要非阻塞,这样做:
publisher.subscribe(new PrintDocumentSubscriber()); //without await
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.6/getting-started/quick-tour/