Принудительный AsyncStream[A] в Seq[A] в Scala
Я специально использую Twitter AsyncStream, и мне нужно взять результаты параллельной обработки и превратить их в Seq, но единственный способ, которым мне удалось получить работу, ужасен. Такое ощущение, что это должен быть один лайнер, но все мои попытки с Await и force либо зависли, либо не обработали работу.
Вот над чем я работаю - какой идиоматичный способ сделать это?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}
1 ответ
Как указывалось @MattFowler - вы можете форсировать поток и ждать его завершения, используя:
Await.result(resultStream.toSeq, 1.second)
toSeq
начнет реализацию потока и вернет Future[Seq[A]]
это завершается, как только все элементы разрешены, как описано здесь.
Затем вы можете заблокировать, пока будущее не завершится с помощью Await.result
,
Убедитесь, что ваш поток конечен! призвание Await.result(resultStream.toSeq, 1.second)
будет висеть вечно.