Операция блокировки в Actor НЕ занимает все диспетчеры по умолчанию

Я изучаю Akka Actor недавно. Я прочитал документ диспетчеров в Actor. Мне любопытно по поводу операции блокировки у актера. Последняя тема в документе описывает, как решить проблему. И я пытаюсь воспроизвести пример эксперимента в документе.

Вот мой код:

package dispatcher

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main extends App{

  var config = ConfigFactory.parseString(
    """
      |my-dispatcher{
      |type = Dispatcher
      |
      |executor = "fork-join-executor"
      |
      |fork-join-executor{
      |fixed-pool-size = 32
      |}
      |throughput = 1
      |}
    """.stripMargin)

//  val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))


  val system = ActorSystem("block")


  val actor1 = system.actorOf(Props(new BlockingFutureActor()))
  val actor2 = system.actorOf(Props(new PrintActor()))

  for(i <- 1 to 1000){
    actor1 ! i
    actor2 ! i
  }

}

package dispatcher

import akka.actor.Actor

import scala.concurrent.{ExecutionContext, Future}

class BlockingFutureActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      Thread.sleep(5000)
      implicit val excutionContext: ExecutionContext = context.dispatcher
      Future {
        Thread.sleep(5000)
        println(s"Blocking future finished ${i}")
      }
  }
}
package dispatcher

import akka.actor.Actor

class PrintActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      println(s"PrintActor: ${i}")
  }
}

Я просто создаю ActorSystem с диспетчерами по умолчанию и все действующие лица зависят от них. BlockingFutureActor имеет операцию блокировки, которая заключена в Future, PrintActor просто печатает число мгновенно.

В пояснении к документу диспетчеры по умолчанию будут заняты Futureв BlockingFutureActor, что приводит к блокировке сообщения PrintActor, Приложение застревает где-то вроде:

> PrintActor: 44
> PrintActor: 45

К сожалению, мой код не заблокирован. Все выходы из PrintActor показать гладко Но выводы из BlockingFutureActor появляются как сжимающая зубная паста. Я пытаюсь отслеживать информацию о моих нитях с помощью отладки Intellij, я получил: мониторинг потоков

Вы можете обнаружить, что только два диспетчера спят (BlockingFutureActor делает это случиться). Другие ждут, что означает, что они доступны для доставки новых сообщений.

Я прочитал ответ о блокировке в Actor ( страница). Цитируется, что "Диспетчеры, по сути, являются пулами потоков. Разделение двух гарантирует, что медленные блокирующие операции не будут морить голодом другие. Этот подход, как правило, называется массовым, потому что идея заключается в том, что если часть приложения выходит из строя, остальная часть остается отзывчивой ".

Диспетчеры по умолчанию оставляют некоторый диспетчер для блокировки операции? Таким образом, система может обрабатывать сообщения, даже если существует очень много операций блокировки, запрашивающих диспетчеров.

Можно ли воспроизвести эксперимент в документе Akka? Что-то не так с моей конфигурацией.

Спасибо за ваши предложения. С наилучшими пожеланиями.

1 ответ

Решение

Причина, по которой вы видите все 1000 печатных операторов из PrintActor до каких-либо печатных заявлений от BlockingFutureActor из-за первого Thread.sleep позвонить в BlockingFutureActor"s receive блок. это Thread.sleep является ключевым отличием вашего кода от примера в официальной документации:

override def receive: Receive = {
  case i: Int =>
    Thread.sleep(5000) // <----- this call is not in the example in the official docs
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

Помните, что актеры обрабатывают одно сообщение за раз. Thread.sleep(5000) в основном имитирует сообщение, обработка которого занимает не менее пяти секунд. BlockingFutureActor не будет обрабатывать другое сообщение, пока не завершит обработку текущего сообщения, даже если в его почтовом ящике находятся сотни сообщений. В то время как BlockingFutureActor обрабатывает этот первый Int сообщение ценности 1, PrintActor уже завершил обработку всех 1000 сообщений, которые были отправлены на него. Чтобы сделать это более понятным, давайте добавим println заявление:

override def receive: Receive = {
  case i: Int =>
    println(s"Entering BlockingFutureActor's receive: $i") // <-----
    Thread.sleep(5000)
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

Пример вывода при запуске программы:

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...

Как видите, к тому времени BlockingFutureActor на самом деле начинает обрабатывать сообщение 2, PrintActor уже перебрал все 1000 сообщений.

Если вы удалите это первым Thread.sleepи вы увидите сообщения, снятые с BlockingFutureActorпочтовый ящик быстрее, потому что работа "делегируется" Future, Однажды Future создается, актер получает следующее сообщение из своего почтового ящика, не дожидаясь Future завершить. Ниже приведен пример вывода без этого первого Thread.sleep (он не будет одинаковым при каждом запуске):

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...
Другие вопросы по тегам