Принудительный AsyncStream[A] в Seq[A] в Scala

Я специально использую Twitter AsyncStream, и мне нужно взять результаты параллельной обработки и превратить их в Seq, но единственный способ, которым мне удалось получить работу, ужасен. Такое ощущение, что это должен быть один лайнер, но все мои попытки с Await и force либо зависли, либо не обработали работу.

Вот над чем я работаю - какой идиоматичный способ сделать это?

  def processWork(work: AsyncStream[Work]): Seq[Result] = {
    // TODO: make less stupid
    val resultStream = work.flatMap { processWork }
    var results : Seq[Result] = Nil
    resultStream.foreach {
      result => {
        results = results :+ result
      }
    }
    results
  }

1 ответ

Решение

Как указывалось @MattFowler - вы можете форсировать поток и ждать его завершения, используя:

Await.result(resultStream.toSeq, 1.second)

toSeq начнет реализацию потока и вернет Future[Seq[A]] это завершается, как только все элементы разрешены, как описано здесь.

Затем вы можете заблокировать, пока будущее не завершится с помощью Await.result,

Убедитесь, что ваш поток конечен! призвание Await.result(resultStream.toSeq, 1.second) будет висеть вечно.

Другие вопросы по тегам