MongoDB Scala - 查询特定字段值的文档

MongoDB Scala - query document for a specific field value

所以我知道在 Mongo Shell 中,您可以使用点符号来获取任何文档中所需的字段。

MongoDB Scala 中的点符号是如何实现的。我对它的工作原理感到困惑。这是从集合中获取文档的代码:

val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)

编辑:

我正在尝试研究一种机制,以便在消费者关闭时基本上重新使用 Kafka 记录。为此,我将我的 kafka 记录存储在外部数据库中,然后尝试从那里获取最新的偏移量并从该点开始消费。这是应该执行此操作的我的 Scala 方法:

def getLatestCommitOffsetFromDB(collectionName: String): Long = {

import com.mongodb.Block
import org.bson.Document

val printBlock = new Block[Document]() {
  override def apply(document: Document): Unit = {
    println(document.toJson)
  }
}

import com.mongodb.async.SingleResultCallback
val callbackWhenFinished = new SingleResultCallback[Void]() {
  override def onResult(result: Void, t: Throwable): Unit = {
    System.out.println("Latest offset fetched from database.")
  }
}

var obj: String = " "

try {

  val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)
  //TODO FIND A WAY TO GET THE VALUE AND STORE IT IN A VARIABLE

} catch {
  case e: RuntimeException =>
    logger.error(s"MongoDB Server Error : Unable to fetch data from collection : $collection")
    logger.error(e.printStackTrace().toString())
}

obj.toLong

}

问题不是我可以从 Mongo 中获取文档,而是我试图访问 Mongo 中的特定字段。 Document 中有四个字段:topic、partition、message 和 offset。我想获取 "offset" 字段并将其存储在一个变量中,因此我可以将其用作重新使用 Kafka 记录的重新启动点。

我从那里去哪里?

POM.xml

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>OffsetManagementPoC</groupId>
<artifactId>OffsetManagementPoC</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah_2.12</artifactId>
        <version>3.1.1</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>3.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-bson_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

根据文档,collection.find() 接受 com.mongodb.DBObject

您可以使用的该接口的实现之一是 BasicDBObject,它基本上类似于 mutable.Map[String, Object]。您可以使用接受如下地图的构造函数:

val query = new com.mongodb.BasicDBObject(Map(
  "foo.bar" -> "value1"
  "bar.foo" -> "value2"
))
val record = collection.find(query)....

您可以这样修改查询:

import com.mongodb.MongoClient
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Projections

def getLatestCommitOffsetFromDB(
  databaseName: String,
  collectionName: String
): Long = {

  val mongoClient = new MongoClient("localhost", 27017);

  val collection =
    mongoClient.getDatabase(databaseName).getCollection(collectionName)

  val record = collection
    .find()
    .projection(
      Projections
        .fields(Projections.include("offset"), Projections.excludeId()))
    .first

  record.get("offset").asInstanceOf[Double].toLong
}

我认为您缺少 com.mongodb.client.model.Projections 导入以便使用 fieldsincludeexcludeId

我使用 first 而不是 limit(1) 以便更容易提取结果。

first returns 一个 Document 对象,您可以在其上调用 get 来检索所请求字段的值。

但其实只要一条记录和一个字段,可以去掉投影!:

val record = collection.find().first