Будущее в обратном вызове запускается в том же потоке

Когда я запускаю этот код:

val intList = List(1, 6, 8)

val listOfFutures = intList.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }

}
val futureOfList = Future.sequence(listOfFutures)

futureOfList onSuccess{
  case int => {
    logInfo("onSuccess")
    int.foreach(i => {
      Future{
      logInfo("h : " + i)
      }
    })
  }
}

futureOfList onFailure{
  case int => {
    logInfo("onFailure")
  }
}

logInfo("after")

Вложенные фьючерсы работают с одной и той же нитью:

future 1
2013/11/24 11:55:00.002 [ModelUtilsTest] [main]: INFO: [] after
future 6
future 8
2013/11/24 11:55:14.878 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] onSuccess 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 1 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 6 
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 8

Почему это случилось? мне нужно внутреннее будущее, чтобы идти параллельно.

2 ответа

Решение

Future реализация проверяет, будет ли он запускать что-то на ForkJoinPool которому принадлежит текущий поток. В этом случае это будет ForkJoinTask.fork(), который добавляет новый рабочий блок в рабочую очередь текущего рабочего потока. Любой другой поток должен украсть эту работу, чтобы выполнить ее. Вы не ожидаете увидеть равномерное распределение ваших задач по доступным потокам, особенно с простыми задачами.

В следующем запуске worker-9 несет главный удар

scala> val f = Future sequence (1 to 20 map (i => Future(i)))
f: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@47fe4ff8

scala> f onSuccess { case is => is foreach (i => Future(println(s"$i on ${Thread.currentThread}"))) }

scala> 2 on Thread[ForkJoinPool-1-worker-9,5,main]
5 on Thread[ForkJoinPool-1-worker-15,5,main]
4 on Thread[ForkJoinPool-1-worker-3,5,main]
6 on Thread[ForkJoinPool-1-worker-9,5,main]
1 on Thread[ForkJoinPool-1-worker-5,5,main]
12 on Thread[ForkJoinPool-1-worker-7,5,main]
11 on Thread[ForkJoinPool-1-worker-9,5,main]
15 on Thread[ForkJoinPool-1-worker-9,5,main]
16 on Thread[ForkJoinPool-1-worker-9,5,main]
17 on Thread[ForkJoinPool-1-worker-9,5,main]
18 on Thread[ForkJoinPool-1-worker-9,5,main]
19 on Thread[ForkJoinPool-1-worker-9,5,main]
20 on Thread[ForkJoinPool-1-worker-9,5,main]
3 on Thread[ForkJoinPool-1-worker-1,5,main]
10 on Thread[ForkJoinPool-1-worker-13,5,main]
9 on Thread[ForkJoinPool-1-worker-11,5,main]
8 on Thread[ForkJoinPool-1-worker-3,5,main]
7 on Thread[ForkJoinPool-1-worker-15,5,main]
14 on Thread[ForkJoinPool-1-worker-7,5,main]
13 on Thread[ForkJoinPool-1-worker-5,5,main]

У вас есть два способа легко достичь параллелизма:

Во-первых, вы можете использовать traverse метод из Future сопутствующий объект.

import scala.collection.breakOut
import scala.concurrent.Future;
import scala.concurrent.ExecutionContext.Implicits.global

val modifiedList = Future.traverse(myList)(myFunc)

Или вы можете использовать par коллекции, которые запускают отображение параллельно.

val modifiedList = myList.par.map(myFunc)(breakOut)
Другие вопросы по тегам