Операция блокировки в Actor НЕ занимает все диспетчеры по умолчанию
Я изучаю Akka Actor недавно. Я прочитал документ диспетчеров в Actor. Мне любопытно по поводу операции блокировки у актера. Последняя тема в документе описывает, как решить проблему. И я пытаюсь воспроизвести пример эксперимента в документе.
Вот мой код:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
Я просто создаю ActorSystem
с диспетчерами по умолчанию и все действующие лица зависят от них. BlockingFutureActor
имеет операцию блокировки, которая заключена в Future
, PrintActor
просто печатает число мгновенно.
В пояснении к документу диспетчеры по умолчанию будут заняты Future
в BlockingFutureActor
, что приводит к блокировке сообщения PrintActor
, Приложение застревает где-то вроде:
> PrintActor: 44
> PrintActor: 45
К сожалению, мой код не заблокирован. Все выходы из PrintActor
показать гладко Но выводы из BlockingFutureActor
появляются как сжимающая зубная паста. Я пытаюсь отслеживать информацию о моих нитях с помощью отладки Intellij, я получил:
Вы можете обнаружить, что только два диспетчера спят (BlockingFutureActor
делает это случиться). Другие ждут, что означает, что они доступны для доставки новых сообщений.
Я прочитал ответ о блокировке в Actor ( страница). Цитируется, что "Диспетчеры, по сути, являются пулами потоков. Разделение двух гарантирует, что медленные блокирующие операции не будут морить голодом другие. Этот подход, как правило, называется массовым, потому что идея заключается в том, что если часть приложения выходит из строя, остальная часть остается отзывчивой ".
Диспетчеры по умолчанию оставляют некоторый диспетчер для блокировки операции? Таким образом, система может обрабатывать сообщения, даже если существует очень много операций блокировки, запрашивающих диспетчеров.
Можно ли воспроизвести эксперимент в документе Akka? Что-то не так с моей конфигурацией.
Спасибо за ваши предложения. С наилучшими пожеланиями.
1 ответ
Причина, по которой вы видите все 1000 печатных операторов из PrintActor
до каких-либо печатных заявлений от BlockingFutureActor
из-за первого Thread.sleep
позвонить в BlockingFutureActor
"s receive
блок. это Thread.sleep
является ключевым отличием вашего кода от примера в официальной документации:
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000) // <----- this call is not in the example in the official docs
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
Помните, что актеры обрабатывают одно сообщение за раз. Thread.sleep(5000)
в основном имитирует сообщение, обработка которого занимает не менее пяти секунд. BlockingFutureActor
не будет обрабатывать другое сообщение, пока не завершит обработку текущего сообщения, даже если в его почтовом ящике находятся сотни сообщений. В то время как BlockingFutureActor
обрабатывает этот первый Int
сообщение ценности 1
, PrintActor
уже завершил обработку всех 1000 сообщений, которые были отправлены на него. Чтобы сделать это более понятным, давайте добавим println
заявление:
override def receive: Receive = {
case i: Int =>
println(s"Entering BlockingFutureActor's receive: $i") // <-----
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
Пример вывода при запуске программы:
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...
Как видите, к тому времени BlockingFutureActor
на самом деле начинает обрабатывать сообщение 2
, PrintActor
уже перебрал все 1000 сообщений.
Если вы удалите это первым Thread.sleep
и вы увидите сообщения, снятые с BlockingFutureActor
почтовый ящик быстрее, потому что работа "делегируется" Future
, Однажды Future
создается, актер получает следующее сообщение из своего почтового ящика, не дожидаясь Future
завершить. Ниже приведен пример вывода без этого первого Thread.sleep
(он не будет одинаковым при каждом запуске):
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...