Акка: как я могу поймать неудачу одного актера внутри другого (не дочернего) актера?
У меня есть два актера:
ProcessManager, который обрабатывает некоторые процессы в системе (например, регистрация пользователя, покупка и т. Д.)
Уведомитель - должен уведомлять пользователя, если в ProcessManager произошла какая-либо ошибка. Мне нужно отловить сбой актера ProcessManager (он был сбой и остановлен по какой-либо причине, например, из-за исключения ActorInitializationException или максимального времени перезапуска, и актер диспетчера процессов был остановлен).
class ProcessManager extends Actor {
override def receive: Receive = {
...
}
}
class Notifier extends Actor {
override def receive: Receive = {
PROCESS MANAGER ACTOR FAILED AND STOPPED =>
// Here I need to catch failure of ProcessManager actor
// (it was failed and stopped for what ever
// reason, for example, because of ActorInitializationException
// or max restart time reached and Process manager actor was stopped).
//
// Then do some stuff, for example, send message to the client via web socket.
}
}
class MyController @Inject() (cc: ControllerComponents, actorSystem: ActorSystem)
(implicit exec: ExecutionContext) extends AbstractController(cc) {
// I need to catch failure of processManager in this actor.
val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
def registerUser = Action.async {
// Actor may be stopped because of ActorInitializationException here
val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
...
// OR it may be stopped here for any reason.
processManager ! "some message which will fail and stop pm actor"
Future.successfull(Ok("Thanks."))
}
}
Как я могу поймать завершение (из-за сбоя) субъекта ProcessManager внутри субъекта Notifier?
РЕДАКТИРОВАТЬ Позвольте мне объяснить контекст моей проблемы.
Я создаю PM актера в контроллере Play и отправляю ему сообщение (Tell), и я немедленно возвращаю Ok ответ пользователю. Актер PM создает другого дочернего актера, и во время создания генерируется исключение ActorInitializationException. Мне нужно уведомить пользователя (через веб-сокет, с помощью актера Notifier), что что-то пошло не так.
1 ответ
Ты можешь использовать DeathWatch
зарегистрировать Notifier
актер для получения Terminated
сообщение, когда ProcessManager
Актер постоянно останавливается. Notifier
понадобится ссылка на ProcessManager
актер для DeathWatch
и один из способов сделать это - отправить ссылку на ProcessManager
как сообщение (это безопасно, потому что ActorRef
неизменяем и сериализуем).
class Notifier extends Actor {
var processManager: Option[ActorRef] = None
def receive: Receive = {
case aRef: ActorRef =>
if (processManager.isEmpty) {
processManager = Some(aRef)
context.watch(aRef) // register to "watch" the process manager
}
case Terminated =>
// process manager was permanently stopped
case ...
}
}
object Demo extends App {
val actorSystem = ActorSystem("my-actor-system")
val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
notifier ! processManager // send processManager's ref to the notifier
...
processManager ! "some message which will fail and stop pm actor"
...
}
Одно предостережение: это может быть невозможно для DeathWatch
регистрация произойдет до ActorInitializationException
бросается при попытке создать ProcessManager
,
Если вам нужно отправить сообщение Notifier
когда ребенок ProcessManager
выдает исключение, затем переопределяет стратегию супервизора в ProcessManager
и отправьте это сообщение как часть стратегии. Что-то вроде:
class ProcessManager extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ActorInitializationException =>
val notifier = context.actorSelection("/path/to/notifier")
notifier ! CustomErrorMessage
Stop
case _: Exception => Escalate
}
def receive: Receive = {
...
}
}