MongoDB Scala - запросить документ для определенного значения поля

Итак, я знаю, что в Mongo Shell вы используете точечную запись, чтобы получить нужное поле в любом документе.

Как достигается точечная нотация в MongoDB Scala. Я не понимаю, как это работает. Вот код, который выбирает документ из коллекции:

val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)

РЕДАКТИРОВАТЬ:

Я пытаюсь поработать над механизмом повторного использования записей Kafka в тот момент, когда потребитель был отключен. Для этого я сохраняю свои записи kafka во внешней базе данных, а затем пытаюсь получить самое последнее смещение оттуда и начать потребление с этой точки. Вот мой метод Scala, который должен это сделать:

def getLatestCommitOffsetFromDB(collectionName: String): Long = {

import com.mongodb.Block
import org.bson.Document

val printBlock = new Block[Document]() {
  override def apply(document: Document): Unit = {
    println(document.toJson)
  }
}

import com.mongodb.async.SingleResultCallback
val callbackWhenFinished = new SingleResultCallback[Void]() {
  override def onResult(result: Void, t: Throwable): Unit = {
    System.out.println("Latest offset fetched from database.")
  }
}

var obj: String = " "

try {

  val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)
  //TODO FIND A WAY TO GET THE VALUE AND STORE IT IN A VARIABLE

} catch {
  case e: RuntimeException =>
    logger.error(s"MongoDB Server Error : Unable to fetch data from collection : $collection")
    logger.error(e.printStackTrace().toString())
}

obj.toLong

}

Проблема не в том, что я могу получать документы из Mongo, более того, я пытаюсь получить доступ к определенному полю в Mongo. В документе есть четыре поля: тема, раздел, сообщение и смещение. Я хочу получить поле "смещение" и сохранить его в переменной, чтобы использовать его в качестве точки перезапуска для повторного использования записей Кафки.

куда мне идти оттуда?

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd "> 4.0.0

<groupId>OffsetManagementPoC</groupId>
<artifactId>OffsetManagementPoC</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah_2.12</artifactId>
        <version>3.1.1</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>3.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-bson_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

2 ответа

Решение

Вы можете изменить свой запрос следующим образом:

import com.mongodb.MongoClient
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Projections

def getLatestCommitOffsetFromDB(
  databaseName: String,
  collectionName: String
): Long = {

  val mongoClient = new MongoClient("localhost", 27017);

  val collection =
    mongoClient.getDatabase(databaseName).getCollection(collectionName)

  val record = collection
    .find()
    .projection(
      Projections
        .fields(Projections.include("offset"), Projections.excludeId()))
    .first

  record.get("offset").asInstanceOf[Double].toLong
}

Я думаю, что вы пропустили com.mongodb.client.model.Projections импорт для использования fields, include а также excludeId

я использовал first вместо limit(1) чтобы было проще извлечь результат.

first возвращает Document объект, по которому можно позвонить get получить значение запрошенного поля.

Но на самом деле, поскольку вам нужна только одна запись и одно поле, вы можете удалить проекцию!:

val record = collection.find().first

Согласно документации, collection.find() принимает com.mongodb.DBObject

Одна из реализаций этого интерфейса, которую вы можете использовать, BasicDBObject который в основном похож на mutable.Map[String, Object], Вы можете использовать конструктор, который принимает карту как:

val query = new com.mongodb.BasicDBObject(Map(
  "foo.bar" -> "value1"
  "bar.foo" -> "value2"
))
val record = collection.find(query)....
Другие вопросы по тегам