Alpakka MongoDB - укажите тип в MongoSource

В настоящее время я играю с Akka Streams и разъемом Alpakka MongoDB.

Можно ли указать тип для MongoSource?

val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
  private val todoCollection: MongoCollection[TodoMongo] = mongoDb
    .withCodecRegistry(codecRegistry)
    .getCollection("todo")

Я хотел бы сделать что-то вроде этого:

val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here

Но я получаю следующую ошибку:

Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].

Я не могу найти правильную документацию по этой части.

1 ответ

Решение

Это еще не опубликовано, но в главной ветке Alpakka, MongoSource.apply принимает параметр типа:

object MongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

Таким образом, с выходом 0.18 Alpakka вы сможете сделать следующее:

val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())

Обратите внимание, что source здесь предполагается, что todoCollection.find() возвращает Observable[TodoMongo]; настроить типы по мере необходимости.

А пока вы можете просто добавить приведенный выше код вручную. Например:

package akka.stream.alpakka.mongodb.scaladsl

import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

Обратите внимание, что MyMongoSource определяется для проживания в akka.stream.alpakka.mongodb.scaladsl пакет (как MongoSource), так как ObservableToPublisherэто пакет закрытого класса. Вы бы использовали MyMongoSource так же, как вы бы использовали MongoSource:

val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find()) 
Другие вопросы по тегам