Акка: как я могу поймать неудачу одного актера внутри другого (не дочернего) актера?

У меня есть два актера:

ProcessManager, который обрабатывает некоторые процессы в системе (например, регистрация пользователя, покупка и т. Д.)

Уведомитель - должен уведомлять пользователя, если в ProcessManager произошла какая-либо ошибка. Мне нужно отловить сбой актера ProcessManager (он был сбой и остановлен по какой-либо причине, например, из-за исключения ActorInitializationException или максимального времени перезапуска, и актер диспетчера процессов был остановлен).

   class ProcessManager extends Actor {
      override def receive: Receive = {
        ...
      }
    }

    class Notifier extends Actor {
      override def receive: Receive = {
        PROCESS MANAGER ACTOR FAILED AND STOPPED =>
          // Here I need to catch failure of ProcessManager actor
          // (it was failed and stopped for what ever
          // reason, for example, because of ActorInitializationException
          // or max restart time reached and Process manager actor was stopped).
          //
          // Then do some stuff, for example, send message to the client via web socket.
      }
    }


    class MyController @Inject() (cc: ControllerComponents, actorSystem: ActorSystem)
      (implicit exec: ExecutionContext) extends AbstractController(cc)  {


      // I need to catch failure of processManager in this actor.
      val notifier = actorSystem.actorOf(Props(classOf[Notifier]))

      def registerUser = Action.async {         

          // Actor may be stopped because of ActorInitializationException here
          val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
              ...

         // OR it may be stopped here for any reason.
         processManager ! "some message which will fail and stop pm actor"

         Future.successfull(Ok("Thanks."))   
       }
   }

Как я могу поймать завершение (из-за сбоя) субъекта ProcessManager внутри субъекта Notifier?

РЕДАКТИРОВАТЬ Позвольте мне объяснить контекст моей проблемы.

Я создаю PM актера в контроллере Play и отправляю ему сообщение (Tell), и я немедленно возвращаю Ok ответ пользователю. Актер PM создает другого дочернего актера, и во время создания генерируется исключение ActorInitializationException. Мне нужно уведомить пользователя (через веб-сокет, с помощью актера Notifier), что что-то пошло не так.

1 ответ

Ты можешь использовать DeathWatch зарегистрировать Notifier актер для получения Terminated сообщение, когда ProcessManager Актер постоянно останавливается. Notifier понадобится ссылка на ProcessManager актер для DeathWatch и один из способов сделать это - отправить ссылку на ProcessManager как сообщение (это безопасно, потому что ActorRef неизменяем и сериализуем).

class Notifier extends Actor {
  var processManager: Option[ActorRef] = None

  def receive: Receive = {
    case aRef: ActorRef =>
      if (processManager.isEmpty) {
        processManager = Some(aRef)
        context.watch(aRef) // register to "watch" the process manager
      }
    case Terminated =>
      // process manager was permanently stopped
    case ...
  }
}

object Demo extends App {
  val actorSystem = ActorSystem("my-actor-system")

  val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
  val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))

  notifier ! processManager // send processManager's ref to the notifier
  ...
  processManager ! "some message which will fail and stop pm actor"
  ...
}

Одно предостережение: это может быть невозможно для DeathWatch регистрация произойдет до ActorInitializationException бросается при попытке создать ProcessManager,


Если вам нужно отправить сообщение Notifier когда ребенок ProcessManager выдает исключение, затем переопределяет стратегию супервизора в ProcessManager и отправьте это сообщение как часть стратегии. Что-то вроде:

class ProcessManager extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ActorInitializationException =>
        val notifier = context.actorSelection("/path/to/notifier")
        notifier ! CustomErrorMessage
        Stop
      case _: Exception => Escalate
   }

   def receive: Receive = {
     ...
   }
}
Другие вопросы по тегам