Скала Акка стрим: как пройти через сек

Я пытаюсь обернуть несколько блокирующих звонков в Future. Возвращаемый тип Seq[User] где User это case class, Следующее просто не скомпилируется с жалобами на наличие различных перегруженных версий. Какие-либо предложения? Я попробовал почти все варианты Source.apply без удачи.

// All I want is Seq[User] => Future[Seq[User]]

def findByFirstName(firstName: String) = {
  val users: Seq[User] = userRepository.findByFirstName(firstName)

  val sink = Sink.fold[User, User](null)((_, elem) => elem)

  val src = Source(users) // doesn't compile

  src.runWith(sink)
}

2 ответа

Решение

Прежде всего, я предполагаю, что вы используете версию 1.0 akka-http-experimental поскольку API может измениться с предыдущего выпуска.

Причина, по которой ваш код не компилируется, заключается в том, что akka.stream.scaladsl.Source$.apply() требуетscala.collection.immutable.Seq вместо scala.collection.mutable.Seq,

Поэтому вы должны преобразовать изменяемую последовательность в неизменяемую последовательность, используя to[T] метод.

Документ: akka.stream.scaladsl.Source

Кроме того, как вы видите документ, Source$.apply() принимает ()=>Iterator[T] так что вы также можете пройти ()=>users.iterator в качестве аргумента.

поскольку Sink.fold(...) возвращает последнее вычисленное выражение, вы можете дать пустое Seq() в качестве первого аргумента, итерации по users с добавлением элемента в последовательность и, наконец, получить результат.

Тем не менее, может быть лучшее решение, которое может создать Sink который помещает каждое вычисленное выражение в Seq, но я не смог его найти.

Следующий код работает.

import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global

case class User(name:String)

object Main extends App{
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    val users = Seq(User("alice"),User("bob"),User("charlie"))

    val sink = Sink.fold[Seq[User], User](Seq())(
      (seq, elem) => 
        {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})

    val src = Source(users.to[scala.collection.immutable.Seq])
    // val src = Source(()=>users.iterator) // this also works

    val fut = src.runWith(sink) // Future[Seq[User]]
    fut.onSuccess({
      case x=>{
        println(s"result => ${x}")
      }
    })
}

Вывод кода выше

elem => User(alice)     | seq => List()
elem => User(bob)       | seq => List(User(alice))
elem => User(charlie)   | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))

Если вам нужно просто Future[Seq[Users]], не используйте потоки akka, но фьючерсы

import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}
Другие вопросы по тегам