Обработка ошибок в актерах Akka
У меня очень простой пример, где у меня есть актер (SimpleActor
) которые выполняют периодическую задачу, отправляя сообщение себе. Сообщение запланировано в конструкторе для актера. В обычном случае (т.е. без сбоев) все работает нормально.
Но что, если Актер столкнется с ошибками? У меня другой актер (SimpleActorWithFault
). У этого актера могут быть недостатки. В этом случае я сам генерирую один, генерируя исключение. Когда происходит сбой (т.е. SimpleActorWithFault
выдает исключение) он автоматически перезапускается. Однако этот перезапуск портит планировщик внутри Actor, который больше не работает как исключение. И если ошибки происходят достаточно быстро, это порождает более неожиданное поведение.
Мой вопрос: каков предпочтительный способ устранения неисправностей в таких случаях? Я знаю, что могу использовать Try
блоки для обработки исключений. Но что, если я расширяю другого актера, где я не могу поставить Try в суперклассе, или происходит случай, когда у меня исключительная ошибка в актере.
import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor
case object MessageA
case object MessageToSelf
class SimpleActor extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
//keeps track of some internal state
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActor] Got MessageA at %d".format(count))
}
case MessageToSelf => {
//update state and tell the world about its current state
count = count + 1
log.info("[SimpleActor] Got scheduled message at %d".format(count))
}
}
}
class SimpleActorWithFault extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
}
case MessageToSelf => {
count = count + 1
log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))
//at some point generate a fault
if (count > 5) {
log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
throw new Exception("Excepttttttiooooooon")
}
}
}
}
object MainApp extends App {
implicit val akkaSystem = akka.actor.ActorSystem()
//Run the Actor without any faults or exceptions
akkaSystem.actorOf(Props(classOf[SimpleActor]))
//comment the above line and uncomment the following to run the actor with faults
//akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))
}
2 ответа
Чтобы не испортить планировщик:
class SimpleActor extends Actor with ActorLogging {
private var cancellable: Option[Cancellable] = None
override def preStart() = {
//schedule a message to self every second
cancellable = Option(context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf))
}
override def postStop() = {
cancellable.foreach(_.cancel())
cancellable = None
}
...
Чтобы правильно обрабатывать исключения (akka.actor.Status.Failure предназначен для правильного ответа на запрос в случае использования шаблона Ask отправителем):
...
def receive: Receive = {
case MessageA => {
try {
log.info("[SimpleActor] Got MessageA at %d".format(count))
} catch {
case e: Exception =>
sender ! akka.actor.Status.Failure(e)
log.error(e.getMessage, e)
}
}
...
Правильный путь состоит в том, чтобы опустить рискованное поведение в своего актера. Этот шаблон называется шаблоном ядра ошибок (см. Раздел "Параллельный доступ Akka", раздел 8.5):
Эта модель описывает очень здравый подход к надзору, который отличает участников друг от друга на основе любого изменчивого состояния, которое они могут удерживать.
Короче говоря, это означает, что игрокам, чье состояние является ценным, нельзя позволить потерпеть неудачу или перезапустить. Любой субъект, хранящий ценные данные, защищен таким образом, что любые рискованные операции передаются подчиненному субъекту, который, если его перезапустить, вызывает только хорошие вещи.
Шаблон ядра ошибок подразумевает толкание уровней риска дальше вниз по дереву.
Смотрите также другой учебник здесь.
Так что в вашем случае это будет примерно так:
SimpleActor
|- ActorWithFault
Вот SimpleActor
выступает в качестве руководителя для ActorWithFault
, Стратегия по умолчанию для любого актера - перезапустить ребенка на Exception
и расширять что-либо еще: http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html
Эскалация означает, что сам актер может быть перезапущен. Поскольку вы действительно не хотите перезапустить SimpleActor
Вы могли бы сделать это всегда перезапустить ActorWithFault
и никогда не увеличивать, переопределяя стратегию супервизора:
class SimpleActor {
override def preStart(){
// our faulty actor --- we will supervise it from now on
context.actorOf(Props[ActorWithFault], "FaultyActor")
...
override val supervisorStrategy = OneForOneStrategy () {
case _: ActorKilledException => Escalate
case _: ActorInitializationException => Escalate
case _ => Restart // keep restarting faulty actor
}
}