akka - Ошибка шаблона ядра и планировщика

Я реализовал шаблон ядра ошибки в образце, который пытался создать.

Идея заключалась в том, чтобы создать диалог между 3 участниками, но один не отвечает сразу, вам нужно отправить несколько сообщений, прежде чем получить ответ. Реализация должна быть устойчивой, даже если акторы работают в распределенной среде.

Итак, я создал дочерний субъект, который пытается использовать планировщик, и, когда наконец приходит ответ, дочернее уведомление становится родительским и останавливается.

Даже если это решение сработает, у меня мало сомнений.

Сначала в дочернем акторе я реализовал планировщик, который выполняет функцию каждые 50 миллисекунд, у меня мало сомнений по поводу контекста выполнения. Я имею в виду, может ли код внутри метода sendMessage изменять своего собственного актера?

Что происходит, когда актер планирует казнь, ожидая, пока актер закончит свою работу?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps

var cancellableSchedule : Option[Cancellable] = None

var counter = 0
var maxCounter = 10

def receive = LoggingReceive {
    case r:MessageB2C_Ack => {
      log.info("ActorChildB - Received MessageB2C_Ack from " + sender())
      parentActor ! r
      context.stop(self)
    }
    case r:SendMessage => {
      log.info("ActorChildB - Received SendMessage from " + sender())
        sendScheduledMessage
    }
}

private def sendScheduledMessage(): Unit = {
    context.system.scheduler.scheduleOnce(0 milliseconds){
      sendMessage(0)
    }
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.language.postfixOps
    cancellableSchedule =  Option(context.system.scheduler.schedule(0 milliseconds, 50 millisecond){
      sendMessage()
    })
}

private def sendMessage(): Unit = {
      log.info("ActorChildB - sendMessage " + msg.getClass.getName + " to " + dest)
      if (counter < maxCounter) {
        dest ! msg
      log.info("ActorChildB - sendMessage " + msg.getClass.getName + " to " + dest)
      if (counter < maxCounter) {
        dest ! msg
        context.system.scheduler.scheduleOnce(50 milliseconds){
          sendMessage(counter + 1)
        }
        counter = counter + 1
      } else  {
        throw new MyRetryTimeoutException("Fine")
      }
}

override def postStop(): Unit = {
    cancellableSchedule.foreach(c => c.cancel())
    super.postStop()
}

0 ответов

Другие вопросы по тегам