Обработка ошибок в актерах Akka

У меня очень простой пример, где у меня есть актер (SimpleActor) которые выполняют периодическую задачу, отправляя сообщение себе. Сообщение запланировано в конструкторе для актера. В обычном случае (т.е. без сбоев) все работает нормально.

Но что, если Актер столкнется с ошибками? У меня другой актер (SimpleActorWithFault). У этого актера могут быть недостатки. В этом случае я сам генерирую один, генерируя исключение. Когда происходит сбой (т.е. SimpleActorWithFault выдает исключение) он автоматически перезапускается. Однако этот перезапуск портит планировщик внутри Actor, который больше не работает как исключение. И если ошибки происходят достаточно быстро, это порождает более неожиданное поведение.

Мой вопрос: каков предпочтительный способ устранения неисправностей в таких случаях? Я знаю, что могу использовать Try блоки для обработки исключений. Но что, если я расширяю другого актера, где я не могу поставить Try в суперклассе, или происходит случай, когда у меня исключительная ошибка в актере.

import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor

case object MessageA

case object MessageToSelf


class SimpleActor extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  //keeps track of some internal state
  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActor] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      //update state and tell the world about its current state 
      count = count + 1
      log.info("[SimpleActor] Got scheduled message at %d".format(count))

    }
  }

}


class SimpleActorWithFault extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      count = count + 1
      log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))

      //at some point generate a fault
      if (count > 5) {
        log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
        throw new Exception("Excepttttttiooooooon")
      }
    }
  }

}


object MainApp extends App {
  implicit val akkaSystem = akka.actor.ActorSystem()
  //Run the Actor without any faults or exceptions 
  akkaSystem.actorOf(Props(classOf[SimpleActor]))

  //comment the above line and uncomment the following to run the actor with faults  
  //akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))

}

2 ответа

Решение

Чтобы не испортить планировщик:

class SimpleActor extends Actor with ActorLogging {

  private var cancellable: Option[Cancellable] = None

  override def preStart() = {
    //schedule a message to self every second
    cancellable = Option(context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf))
  }

  override def postStop() = {
    cancellable.foreach(_.cancel())
    cancellable = None
  }
...

Чтобы правильно обрабатывать исключения (akka.actor.Status.Failure предназначен для правильного ответа на запрос в случае использования шаблона Ask отправителем):

...
def receive: Receive = {
    case MessageA => {
      try {
        log.info("[SimpleActor] Got MessageA at %d".format(count))
      } catch {
        case e: Exception =>
          sender ! akka.actor.Status.Failure(e)
          log.error(e.getMessage, e)
      }
    }
...

Правильный путь состоит в том, чтобы опустить рискованное поведение в своего актера. Этот шаблон называется шаблоном ядра ошибок (см. Раздел "Параллельный доступ Akka", раздел 8.5):

Эта модель описывает очень здравый подход к надзору, который отличает участников друг от друга на основе любого изменчивого состояния, которое они могут удерживать.

Короче говоря, это означает, что игрокам, чье состояние является ценным, нельзя позволить потерпеть неудачу или перезапустить. Любой субъект, хранящий ценные данные, защищен таким образом, что любые рискованные операции передаются подчиненному субъекту, который, если его перезапустить, вызывает только хорошие вещи.

Шаблон ядра ошибок подразумевает толкание уровней риска дальше вниз по дереву.

Смотрите также другой учебник здесь.

Так что в вашем случае это будет примерно так:

SimpleActor 
 |- ActorWithFault

Вот SimpleActor выступает в качестве руководителя для ActorWithFault, Стратегия по умолчанию для любого актера - перезапустить ребенка на Exception и расширять что-либо еще: http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html

Эскалация означает, что сам актер может быть перезапущен. Поскольку вы действительно не хотите перезапустить SimpleActor Вы могли бы сделать это всегда перезапустить ActorWithFault и никогда не увеличивать, переопределяя стратегию супервизора:

class SimpleActor {
  override def preStart(){
    // our faulty actor --- we will supervise it from now on
    context.actorOf(Props[ActorWithFault], "FaultyActor") 
  ...

  override val supervisorStrategy = OneForOneStrategy () {
    case _: ActorKilledException => Escalate
    case _: ActorInitializationException => Escalate
    case _ => Restart // keep restarting faulty actor
  }

}
Другие вопросы по тегам