Alpakka MongoDB - укажите тип в MongoSource
В настоящее время я играю с Akka Streams и разъемом Alpakka MongoDB.
Можно ли указать тип для MongoSource
?
val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
private val todoCollection: MongoCollection[TodoMongo] = mongoDb
.withCodecRegistry(codecRegistry)
.getCollection("todo")
Я хотел бы сделать что-то вроде этого:
val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here
Но я получаю следующую ошибку:
Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].
Я не могу найти правильную документацию по этой части.
1 ответ
Это еще не опубликовано, но в главной ветке Alpakka, MongoSource.apply
принимает параметр типа:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Таким образом, с выходом 0.18 Alpakka вы сможете сделать следующее:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Обратите внимание, что source
здесь предполагается, что todoCollection.find()
возвращает Observable[TodoMongo]
; настроить типы по мере необходимости.
А пока вы можете просто добавить приведенный выше код вручную. Например:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Обратите внимание, что MyMongoSource
определяется для проживания в akka.stream.alpakka.mongodb.scaladsl
пакет (как MongoSource
), так как ObservableToPublisher
это пакет закрытого класса. Вы бы использовали MyMongoSource
так же, как вы бы использовали MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())