Как повысить высших руководителей в Акке?
У меня есть следующий актер высшего уровня ("родитель для большинства"):
// Groovy pseudo-code
class Master extends UntypedActor {
ActorRef child1
ActorRef child2
ActorRef child3
ActorRef backup
@Override
void onReceive(Object message) throws Exception {
if(message instanceof Terminated) {
Terminated terminated = message as Terminated
if(terminated.actor != backup) {
terminated.actor = backup
} else {
// TODO: What to do here? How to escalate from here?
}
} else {
child1.tell(new DoSomething(message), getSelf())
child2.tell(new DoSomethingElse(message), getSelf())
child3.tell(new DoSomethingElser(message, getSelf())
}
}
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(isRecoverable(t) { // Don’t worry about how/where this is defined or how it works
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
}
Как видите, он контролирует трех детей, и когда эти три ребенка выдают "восстанавливаемые" исключения, они останавливаются и заменяются резервной копией. Все идет нормально.
Проблема, с которой я сейчас сталкиваюсь, заключается в том, что если резервные актеры бросают какие-либо бросаемые объекты, я хочу рассмотреть это Master
актер (и действительно, мое приложение в целом), чтобы быть в состоянии, когда оно не может продолжать обрабатывать любые входные данные, и перевести исключение на уровень опекуна.
Я новичок в Akka и не уверен, где разместить этот код и как он должен выглядеть. Опять же, мне просто нужна логика, которая говорит:
- Если резервный актер выбрасывает любой бросаемый объект, переведите исключение в
Master
родитель, который действительно должен быть актером / конструкцией " акка"
Первая часть этого заключается в том, что нам нужно знать, когда выдается исключение из резервной копии; Я могу справиться с этой частью, поэтому давайте представим, что наша стратегия теперь выглядит так:
@Override
SupervisorStrategy supervisorStrategy() {
new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
@Override
Directive apply(Throwable t) throws Exception {
if(wasThrownFromBackup(t)) {
SupervisorStrategy.escalate()
} else if(isRecoverable(t) {
SupervisorStrategy.stop()
} else {
SupervisorStrategy.escalate()
}
}
})
}
Но, как вы можете видеть, я все еще изо всех сил пытаюсь реализовать эскалацию "из системы акторов". Идеи? Пример кода Java очень предпочтителен, так как для меня Scala выглядит иероглифами.
1 ответ
Взгляните на шаблон "Жнец" здесь http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2 Извините, это в Scala, но я думаю, что его достаточно легко перевести на Java.
Также посмотрите здесь, https://groups.google.com/forum/
Вы должны установить в своей конфигурации
akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"
Это приведет к тому, что любой актер "верхнего уровня", который обостряется, будет остановлен системой. Затем вы реализуете другого актера высшего уровня под названием "Жнец" (или как вы хотите его называть), у которого есть только одна работа, наблюдаете за главным актером высшего уровня и принимаете меры (например, context.system.shutdown()
) когда актер верхнего уровня останавливается.
Я не знаю API akka java, поэтому не могу предоставить вам точный пример, но в Scala, из блога LetItCrash выше, он выглядит следующим образом:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = context.system.shutdown()
}
При запуске приложения вы создаете своего главного актера, создаете своего жнеца, отправляете WatchMe(masterActor)
сообщение жнецу.