Как повысить высших руководителей в Акке?

У меня есть следующий актер высшего уровня ("родитель для большинства"):

// Groovy pseudo-code
class Master extends UntypedActor {
    ActorRef child1
    ActorRef child2
    ActorRef child3
    ActorRef backup

    @Override
    void onReceive(Object message) throws Exception {
        if(message instanceof Terminated) {
            Terminated terminated = message as Terminated
            if(terminated.actor != backup) {
                terminated.actor = backup
            } else {
                // TODO: What to do here? How to escalate from here?
            }
        } else {
            child1.tell(new DoSomething(message), getSelf())
            child2.tell(new DoSomethingElse(message), getSelf())
            child3.tell(new DoSomethingElser(message, getSelf())
        }
    }

    @Override
    SupervisorStrategy supervisorStrategy() {
        new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
            @Override
            Directive apply(Throwable t) throws Exception {
                if(isRecoverable(t) {   // Don’t worry about how/where this is defined or how it works
                    SupervisorStrategy.stop()
                } else {
                    SupervisorStrategy.escalate()
                }
            }
        })
    }
}

Как видите, он контролирует трех детей, и когда эти три ребенка выдают "восстанавливаемые" исключения, они останавливаются и заменяются резервной копией. Все идет нормально.

Проблема, с которой я сейчас сталкиваюсь, заключается в том, что если резервные актеры бросают какие-либо бросаемые объекты, я хочу рассмотреть это Master актер (и действительно, мое приложение в целом), чтобы быть в состоянии, когда оно не может продолжать обрабатывать любые входные данные, и перевести исключение на уровень опекуна.

Я новичок в Akka и не уверен, где разместить этот код и как он должен выглядеть. Опять же, мне просто нужна логика, которая говорит:

  • Если резервный актер выбрасывает любой бросаемый объект, переведите исключение в Masterродитель, который действительно должен быть актером / конструкцией " акка"

Первая часть этого заключается в том, что нам нужно знать, когда выдается исключение из резервной копии; Я могу справиться с этой частью, поэтому давайте представим, что наша стратегия теперь выглядит так:

@Override
SupervisorStrategy supervisorStrategy() {
    new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
        @Override
        Directive apply(Throwable t) throws Exception {
            if(wasThrownFromBackup(t)) {
                SupervisorStrategy.escalate()
            } else if(isRecoverable(t) {
                SupervisorStrategy.stop()
            } else {
                SupervisorStrategy.escalate()
            }
        }
    })
}

Но, как вы можете видеть, я все еще изо всех сил пытаюсь реализовать эскалацию "из системы акторов". Идеи? Пример кода Java очень предпочтителен, так как для меня Scala выглядит иероглифами.

1 ответ

Решение

Взгляните на шаблон "Жнец" здесь http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2 Извините, это в Scala, но я думаю, что его достаточно легко перевести на Java.

Также посмотрите здесь, https://groups.google.com/forum/

Вы должны установить в своей конфигурации

akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"

Это приведет к тому, что любой актер "верхнего уровня", который обостряется, будет остановлен системой. Затем вы реализуете другого актера высшего уровня под названием "Жнец" (или как вы хотите его называть), у которого есть только одна работа, наблюдаете за главным актером высшего уровня и принимаете меры (например, context.system.shutdown()) когда актер верхнего уровня останавливается.

Я не знаю API akka java, поэтому не могу предоставить вам точный пример, но в Scala, из блога LetItCrash выше, он выглядит следующим образом:

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = context.system.shutdown()
}

При запуске приложения вы создаете своего главного актера, создаете своего жнеца, отправляете WatchMe(masterActor) сообщение жнецу.

Другие вопросы по тегам