Twitter's Future.collect не работает одновременно (Scala)

Исходя из фона node.js, я новичок в Scala и попытался использовать Twitter's Future.collect для выполнения нескольких простых параллельных операций. Но мой код показывает последовательное поведение, а не параллельное поведение. Что я делаю неправильно?

Вот мой код,

import com.twitter.util.Future

def waitForSeconds(seconds: Int, container:String): Future[String]  = Future[String] {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

def mainFunction:String = {
  val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
  val singleTask = waitForSeconds(1, "Single")

  allTasks onSuccess  { res =>
    println("All tasks succeeded with result " + res)
  }

  singleTask onSuccess { res =>
    println("Single task succeeded with result " + res)
  }

  "Function Complete"
}

println(mainFunction)

и это вывод, который я получаю,

All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

Результат, который я ожидаю,

All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

1 ответ

Решение

Фьючерсы в Твиттере более четко показывают, где выполняются вычисления, чем фьючерсы стандартной библиотеки Scala. Особенно, Future.apply будет захватывать исключения безопасно (как s.c.Future), но в нем ничего не говорится о том, в каком потоке будут выполняться вычисления. В вашем случае вычисления выполняются в основном потоке, поэтому вы видите результаты, которые видите.

Этот подход имеет несколько преимуществ перед будущим API стандартной библиотеки. С одной стороны, это упрощает сигнатуры методов, поскольку не существует неявного ExecutionContext это должно быть распространено повсюду. Что еще более важно, это облегчает избегание переключений контекста ( вот классическое объяснение Брайана Дегенхардта). В этом отношении твиттера Future больше похоже на Скалаза Task и имеет практически те же преимущества в производительности (описано, например, в этом блоге).

Недостаток более явного понимания того, где выполняются вычисления, заключается в том, что вам нужно более четко указывать, где выполняются вычисления. В вашем случае вы могли бы написать что-то вроде этого:

import com.twitter.util.{ Future, FuturePool }

val pool = FuturePool.unboundedPool

def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

Это не будет производить именно тот результат, который вы запрашиваете ("Функция завершена" будет напечатана первой, и allTasks а также singleTask не упорядочены по отношению друг к другу), но он будет выполнять задачи параллельно в отдельных потоках.

(В качестве сноски: FuturePool.unboundedPool в приведенном выше примере это простой способ создания будущего пула для демонстрации, который часто просто хорош, но не подходит для вычислений, интенсивно использующих процессор - см. FuturePool API документы для других способов создания будущего пула, который будет использовать ExecutorService что вы предоставляете и можете сами управлять собой.)

Другие вопросы по тегам