Скала Акка стрим: как пройти через сек
Я пытаюсь обернуть несколько блокирующих звонков в Future
. Возвращаемый тип Seq[User]
где User
это case class
, Следующее просто не скомпилируется с жалобами на наличие различных перегруженных версий. Какие-либо предложения? Я попробовал почти все варианты Source.apply
без удачи.
// All I want is Seq[User] => Future[Seq[User]]
def findByFirstName(firstName: String) = {
val users: Seq[User] = userRepository.findByFirstName(firstName)
val sink = Sink.fold[User, User](null)((_, elem) => elem)
val src = Source(users) // doesn't compile
src.runWith(sink)
}
2 ответа
Прежде всего, я предполагаю, что вы используете версию 1.0 akka-http-experimental
поскольку API может измениться с предыдущего выпуска.
Причина, по которой ваш код не компилируется, заключается в том, что akka.stream.scaladsl.Source$.apply()
требуетscala.collection.immutable.Seq
вместо scala.collection.mutable.Seq
,
Поэтому вы должны преобразовать изменяемую последовательность в неизменяемую последовательность, используя to[T]
метод.
Документ: akka.stream.scaladsl.Source
Кроме того, как вы видите документ, Source$.apply()
принимает ()=>Iterator[T]
так что вы также можете пройти ()=>users.iterator
в качестве аргумента.
поскольку Sink.fold(...)
возвращает последнее вычисленное выражение, вы можете дать пустое Seq()
в качестве первого аргумента, итерации по users
с добавлением элемента в последовательность и, наконец, получить результат.
Тем не менее, может быть лучшее решение, которое может создать Sink
который помещает каждое вычисленное выражение в Seq
, но я не смог его найти.
Следующий код работает.
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global
case class User(name:String)
object Main extends App{
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
val users = Seq(User("alice"),User("bob"),User("charlie"))
val sink = Sink.fold[Seq[User], User](Seq())(
(seq, elem) =>
{println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})
val src = Source(users.to[scala.collection.immutable.Seq])
// val src = Source(()=>users.iterator) // this also works
val fut = src.runWith(sink) // Future[Seq[User]]
fut.onSuccess({
case x=>{
println(s"result => ${x}")
}
})
}
Вывод кода выше
elem => User(alice) | seq => List()
elem => User(bob) | seq => List(User(alice))
elem => User(charlie) | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))
Если вам нужно просто Future[Seq[Users]], не используйте потоки akka, но фьючерсы
import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
session.getFriends()
}