Scala/Akka: почему фьючерсы в фьючерсах работают последовательно при запуске в Akka Dispatcher
Мы наблюдали странное поведение, когда пытались запустить несколько фьючерсов из метода приема актера. Если мы используем наши сконфигурированные диспетчеры как ExecutionContext, фьючерсы запускаются в том же потоке и последовательно. Если мы используем ExecutionContext. Implicits.global, фьючерсы работают параллельно, как и ожидалось.
Мы свели код к следующему примеру (более полный пример приведен ниже):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
Компилируемый пример будет выглядеть так:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
Мы попытались использовать оба, thread-pool-executor и fork-join-executor, с тем же результатом.
Мы неправильно используем фьючерсы? Как тогда вы должны создавать параллельные задачи?
3 ответа
Из описания внутренних акка BatchingExecutor
(выделение мое):
Mixin trait для исполнителя, который группирует несколько вложенных
Runnable.run()
вызовы в один Runnable передаются первоначальному исполнителю. Это может быть полезной оптимизацией, поскольку она обходит очередь задач исходного контекста и хранит связанный (вложенный) код в одном потоке, что может улучшить привязку к процессору. Однако, если задачи, переданные Исполнителю, являются блокирующими или дорогостоящими, эта оптимизация может предотвратить кражу работы и ухудшить производительность.... Исполнитель пакетной обработки может создать взаимоблокировки, если код не используетscala.concurrent.blocking
когда это необходимо, потому что задачи, созданные в других задачах, будут блокироваться при завершении внешней задачи.
Если вы используете диспетчер, который смешивается в BatchingExecutor
- а именно, подкласс MessageDispatcher
- Вы можете использовать scala.concurrent.blocking
Конструкция для включения параллелизма с вложенными фьючерсами:
Future {
Future {
blocking {
doBlockingWork()
}
}
}
В вашем примере вы бы добавили blocking
в startFuture
метод:
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
blocking {
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
Пример вывода из бега startFutures(true)(actorSystem.dispatcher)
с вышеуказанным изменением:
Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
Это связано с настройкой "пропускной способности" для диспетчера. Я добавил "fair-dispatcher" в application.conf, чтобы продемонстрировать это:
fair-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
Вот ваш пример с несколькими модификациями для использования справедливого диспетчера для Futures и вывода текущего значения параметра пропускной способности:
package com.test
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
println("Default dispatcher throughput:")
println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))
println("Fair dispatcher throughput:")
println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
Выход:
Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15
Как вы можете видеть, fair-dispatcher использует разные потоки для большинства фьючерсов.
Диспетчер по умолчанию оптимизирован для участников, поэтому пропускная способность установлена на 5, чтобы минимизировать переключение контекста, чтобы повысить пропускную способность обработки сообщений при сохранении некоторой степени справедливости.
Единственное изменение в моем справедливом диспетчере - это пропускная способность: 1, т. Е. Каждому асинхронному запросу выполнения предоставляется свой собственный поток, если это возможно (до уровня параллелизма-max).
Я бы порекомендовал создать отдельных диспетчеров для фьючерсов, используемых для разных целей. Например, один диспетчер (т. Е. Пул потоков) для вызова некоторых веб-служб, другой для блокировки доступа к БД и т. Д. Это даст вам более точный контроль над ним, настроив пользовательские параметры диспетчера.
Посмотрите на https://doc.akka.io/docs/akka/current/dispatchers.html, это действительно полезно для понимания деталей.
Также ознакомьтесь с настройками ссылок Akka (в частности, диспетчера по умолчанию), там есть куча полезных комментариев: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf
После некоторых исследований я обнаружил, что Dispatcher
класс реализации akka.dispatch.BatchingExecutor
, По соображениям производительности этот класс проверяет, какие задачи следует пакетировать в одном потоке. Future.map
внутренне создает scala.concurrent.OnCompleteRunnable
который упакован в BatchingExecutor
,
Это кажется разумным для map()
/ flatMap()
где одна задача генерирует одну последующую задачу, но не для явных новых фьючерсов, которые используются для форка. Внутренне Future.apply
реализуется Future.successful().map
и, таким образом, пакетируется. Мой обходной путь теперь состоит в том, чтобы создавать фьючерсы другим способом:
object MyFuture {
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()
class FuturesStarter extends Runnable {
override def run(): Unit = {
promise.complete(Try(body))
}
}
executor.execute(new FuturesStarter)
promise.future
}
}
FutureStarter
-Runnables не пакетируются и, следовательно, работают параллельно.
Кто-нибудь может подтвердить, что это решение хорошо? Есть ли лучшие способы решить эту проблему? Является ли текущая реализация Future
/ BatchingExecutor
хотел или это баг?