Akka - как родитель отправляет сообщение ребенку после того, как у ребенка было исключение

Если у меня есть parent актер, который отправляет одно сообщение за раз child актер.

Когда дочерний процесс завершает обработку текущего сообщения, он уведомляет родителя, что, в свою очередь, отправит новое сообщение дочернему элементу.

Чтобы сохранить этот цикл, даже если дочерний элемент падает на определенном сообщении, я добавил SupervisorStrategy к родителю:

    private static SupervisorStrategy strategy =
        new OneForOneStrategy(10, Duration.create("1 minute"),
                new Function<Throwable, SupervisorStrategy.Directive>() {
                    @Override
                    public SupervisorStrategy.Directive apply(Throwable t) {
                        if (t instanceof NullPointerException) {
                            return resume();
                        } else {
                            return escalate();
                        }
                    }
                });

Идея состоит в том, что, что бы ни случилось с ребенком, оно возобновится, и родитель сможет отправить ему следующее сообщение.

Но как родитель узнает, когда произошла ошибка, чтобы отправить следующее сообщение?

Что является причиной для родителя, что что-то случилось с ребенком?

Есть ли что-то вроде метода "onChileError", который мне нужно переопределить?

(будет ценить пример Java поверх Scala)

Благодарю.

2 ответа

Когда дочерний процесс завершает обработку текущего сообщения, он уведомляет родителя, что в свою очередь отправит новое сообщение дочернему элементу....

Идея состоит в том, что, что бы ни случилось с ребенком, оно возобновится, и родитель сможет отправить ему следующее сообщение.

Ваша текущая стратегия супервизора делает следующее: Если NullPointerException брошен в ребенка, ребенок возобновляется, как будто ничего не случилось. Если выдается другое исключение, оно обостряется дальше по иерархии акторов, что, вероятно, означает, что стратегия супервизора по умолчанию срабатывает, и в этом случае дочерний процесс перезапускается. Если вы хотите, чтобы ребенок возобновил учебу независимо от исключения, измените свою стратегию на следующую (я использую DeciderBuilder):

private static SupervisorStrategy strategy =
  new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
    match(Exception.class, e -> {
      // getSelf() is the parent and getSender() is the child
      GiveMeAnotherMessage message = // ...
      getSelf().tell(message, getSender());
      resume();
    })
    .matchAny(o -> escalate())
    .build());

@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

Но как родитель узнает, когда произошла ошибка, чтобы отправить следующее сообщение?

Что является причиной для родителя, что что-то случилось с ребенком?

Есть ли что-то вроде метода "onChileError", который мне нужно переопределить?

Решающий может получить доступ к ссылке на текущий отказавший потомок через getSender() так что все, что вам нужно сделать, это отправить сообщение родителю, чтобы сообщить ему, что ребенок готов обработать другое сообщение из самой стратегии, как показано в приведенном выше коде.

Я бы предложил реализовать следующий сценарий (примеры в Scala):

Настройте стратегию супервизора, чтобы останавливаться на ошибках

// Stop actors on error - DeathWatch will handle restart sequence
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

Подписаться на часы смерти ребенка на создание

def createWorker() = {
  val worker = actorOf(...)
  context.watch(worker)
  worker
}

Если ребенок неожиданно уволен, создайте новый и повторно отправьте текущую часть работы (или следующую, если необходимо)

case Terminated(actor) => 
  val worker = createWorker()
  worker ! DoWork(...)

Это также поможет, как только вы начнете масштабировать актеров в кластере, так как, когда узел становится недоступным в кластере, он не будет запускать стратегию наблюдения, только удаленное наблюдение за смертью.

Другие вопросы по тегам