Scala/Akka: почему фьючерсы в фьючерсах работают последовательно при запуске в Akka Dispatcher

Мы наблюдали странное поведение, когда пытались запустить несколько фьючерсов из метода приема актера. Если мы используем наши сконфигурированные диспетчеры как ExecutionContext, фьючерсы запускаются в том же потоке и последовательно. Если мы используем ExecutionContext. Implicits.global, фьючерсы работают параллельно, как и ожидалось.

Мы свели код к следующему примеру (более полный пример приведен ниже):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
   Future{ doWork() } 
   Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
   Future{ doWork() }
   Future{ doWork() }
}

Компилируемый пример будет выглядеть так:

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}

Мы попытались использовать оба, thread-pool-executor и fork-join-executor, с тем же результатом.

Мы неправильно используем фьючерсы? Как тогда вы должны создавать параллельные задачи?

3 ответа

Решение

Из описания внутренних акка BatchingExecutor (выделение мое):

Mixin trait для исполнителя, который группирует несколько вложенных Runnable.run() вызовы в один Runnable передаются первоначальному исполнителю. Это может быть полезной оптимизацией, поскольку она обходит очередь задач исходного контекста и хранит связанный (вложенный) код в одном потоке, что может улучшить привязку к процессору. Однако, если задачи, переданные Исполнителю, являются блокирующими или дорогостоящими, эта оптимизация может предотвратить кражу работы и ухудшить производительность.... Исполнитель пакетной обработки может создать взаимоблокировки, если код не использует scala.concurrent.blocking когда это необходимо, потому что задачи, созданные в других задачах, будут блокироваться при завершении внешней задачи.

Если вы используете диспетчер, который смешивается в BatchingExecutor - а именно, подкласс MessageDispatcher - Вы можете использовать scala.concurrent.blocking Конструкция для включения параллелизма с вложенными фьючерсами:

Future {
  Future {
    blocking {
      doBlockingWork()
    }
  }
}

В вашем примере вы бы добавили blocking в startFuture метод:

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
  blocking {
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

Пример вывода из бега startFutures(true)(actorSystem.dispatcher) с вышеуказанным изменением:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4

Это связано с настройкой "пропускной способности" для диспетчера. Я добавил "fair-dispatcher" в application.conf, чтобы продемонстрировать это:

fair-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 1
}

Вот ваш пример с несколькими модификациями для использования справедливого диспетчера для Futures и вывода текущего значения параметра пропускной способности:

package com.test

import akka.actor.ActorSystem

import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")

  println("Default dispatcher throughput:")
  println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))

  println("Fair dispatcher throughput:")
  println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }
}

Выход:

Default dispatcher throughput:
5
Fair dispatcher throughput:
1
Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
Start Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
Started Futures on thread Experimental-akka.actor.default-dispatcher-10
Future 4 finished on thread Experimental-fair-dispatcher-15
Future 2 finished on thread Experimental-fair-dispatcher-13
Future 1 finished on thread Experimental-fair-dispatcher-12
Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
Future 5 finished on thread Experimental-fair-dispatcher-17
Future 7 finished on thread Experimental-fair-dispatcher-18
Future 8 finished on thread Experimental-fair-dispatcher-19
Future 6 finished on thread Experimental-fair-dispatcher-16
Future 3 finished on thread Experimental-fair-dispatcher-14
Future 9 finished on thread Experimental-fair-dispatcher-15

Как вы можете видеть, fair-dispatcher использует разные потоки для большинства фьючерсов.

Диспетчер по умолчанию оптимизирован для участников, поэтому пропускная способность установлена ​​на 5, чтобы минимизировать переключение контекста, чтобы повысить пропускную способность обработки сообщений при сохранении некоторой степени справедливости.

Единственное изменение в моем справедливом диспетчере - это пропускная способность: 1, т. Е. Каждому асинхронному запросу выполнения предоставляется свой собственный поток, если это возможно (до уровня параллелизма-max).

Я бы порекомендовал создать отдельных диспетчеров для фьючерсов, используемых для разных целей. Например, один диспетчер (т. Е. Пул потоков) для вызова некоторых веб-служб, другой для блокировки доступа к БД и т. Д. Это даст вам более точный контроль над ним, настроив пользовательские параметры диспетчера.

Посмотрите на https://doc.akka.io/docs/akka/current/dispatchers.html, это действительно полезно для понимания деталей.

Также ознакомьтесь с настройками ссылок Akka (в частности, диспетчера по умолчанию), там есть куча полезных комментариев: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf

После некоторых исследований я обнаружил, что Dispatcher класс реализации akka.dispatch.BatchingExecutor, По соображениям производительности этот класс проверяет, какие задачи следует пакетировать в одном потоке. Future.map внутренне создает scala.concurrent.OnCompleteRunnable который упакован в BatchingExecutor,

Это кажется разумным для map() / flatMap() где одна задача генерирует одну последующую задачу, но не для явных новых фьючерсов, которые используются для форка. Внутренне Future.apply реализуется Future.successful().map и, таким образом, пакетируется. Мой обходной путь теперь состоит в том, чтобы создавать фьючерсы другим способом:

object MyFuture {
  def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
    val promise = Promise[T]()
    class FuturesStarter extends Runnable {
      override def run(): Unit = {
        promise.complete(Try(body))
      }
    }
    executor.execute(new FuturesStarter)
    promise.future
  }
}

FutureStarter-Runnables не пакетируются и, следовательно, работают параллельно.

Кто-нибудь может подтвердить, что это решение хорошо? Есть ли лучшие способы решить эту проблему? Является ли текущая реализация Future / BatchingExecutor хотел или это баг?

Другие вопросы по тегам