Будущее в обратном вызове запускается в том же потоке
Когда я запускаю этот код:
val intList = List(1, 6, 8)
val listOfFutures = intList.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val futureOfList = Future.sequence(listOfFutures)
futureOfList onSuccess{
case int => {
logInfo("onSuccess")
int.foreach(i => {
Future{
logInfo("h : " + i)
}
})
}
}
futureOfList onFailure{
case int => {
logInfo("onFailure")
}
}
logInfo("after")
Вложенные фьючерсы работают с одной и той же нитью:
future 1
2013/11/24 11:55:00.002 [ModelUtilsTest] [main]: INFO: [] after
future 6
future 8
2013/11/24 11:55:14.878 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] onSuccess
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 1
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 6
2013/11/24 11:55:15.378 [ModelUtilsTest] [ForkJoinPool-1-worker-1]: INFO: [] h : 8
Почему это случилось? мне нужно внутреннее будущее, чтобы идти параллельно.
2 ответа
Future
реализация проверяет, будет ли он запускать что-то на ForkJoinPool
которому принадлежит текущий поток. В этом случае это будет ForkJoinTask.fork()
, который добавляет новый рабочий блок в рабочую очередь текущего рабочего потока. Любой другой поток должен украсть эту работу, чтобы выполнить ее. Вы не ожидаете увидеть равномерное распределение ваших задач по доступным потокам, особенно с простыми задачами.
В следующем запуске worker-9
несет главный удар
scala> val f = Future sequence (1 to 20 map (i => Future(i)))
f: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@47fe4ff8
scala> f onSuccess { case is => is foreach (i => Future(println(s"$i on ${Thread.currentThread}"))) }
scala> 2 on Thread[ForkJoinPool-1-worker-9,5,main]
5 on Thread[ForkJoinPool-1-worker-15,5,main]
4 on Thread[ForkJoinPool-1-worker-3,5,main]
6 on Thread[ForkJoinPool-1-worker-9,5,main]
1 on Thread[ForkJoinPool-1-worker-5,5,main]
12 on Thread[ForkJoinPool-1-worker-7,5,main]
11 on Thread[ForkJoinPool-1-worker-9,5,main]
15 on Thread[ForkJoinPool-1-worker-9,5,main]
16 on Thread[ForkJoinPool-1-worker-9,5,main]
17 on Thread[ForkJoinPool-1-worker-9,5,main]
18 on Thread[ForkJoinPool-1-worker-9,5,main]
19 on Thread[ForkJoinPool-1-worker-9,5,main]
20 on Thread[ForkJoinPool-1-worker-9,5,main]
3 on Thread[ForkJoinPool-1-worker-1,5,main]
10 on Thread[ForkJoinPool-1-worker-13,5,main]
9 on Thread[ForkJoinPool-1-worker-11,5,main]
8 on Thread[ForkJoinPool-1-worker-3,5,main]
7 on Thread[ForkJoinPool-1-worker-15,5,main]
14 on Thread[ForkJoinPool-1-worker-7,5,main]
13 on Thread[ForkJoinPool-1-worker-5,5,main]
У вас есть два способа легко достичь параллелизма:
Во-первых, вы можете использовать traverse
метод из Future
сопутствующий объект.
import scala.collection.breakOut
import scala.concurrent.Future;
import scala.concurrent.ExecutionContext.Implicits.global
val modifiedList = Future.traverse(myList)(myFunc)
Или вы можете использовать par
коллекции, которые запускают отображение параллельно.
val modifiedList = myList.par.map(myFunc)(breakOut)