Акка: тестирование, мониторинг \ наблюдение за смертью

В моем сценарии у меня есть 2 актера:

  1. watchee (Я использую TestProbe)
  2. watcher (Watcher завернутый в TestActorRef выставить некоторые внутренние state Я отслеживаю в своем тесте)

Наблюдатель должен предпринять некоторые действия, когда watchee умирает.

Вот полный тестовый пример, который я написал до сих пор:

class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("TempTest"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  class WatcherActor(watchee: ActorRef) extends Actor {

    var state = "initial"
    context.watch(watchee)

    override def receive: Receive = {
      case "start" =>
        state = "start"
      case _: Terminated =>
        state = "terminated"
    }

  }

  test("example") {
    val watchee = TestProbe()
    val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))

    assert(watcher.underlyingActor.state === "initial")

    watcher ! "start" // "start" will be sent and handled by watcher synchronously
    assert(watcher.underlyingActor.state === "start")

    system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
    Thread.sleep(100) // what is the best way to avoid blocking here?
    assert(watcher.underlyingActor.state === "terminated")
  }

}

Теперь, поскольку все задействованные актеры используют CallingThreadDispatcher (все тестовые помощники Акки создаются с использованием реквизита с .withDispatcher(CallingThreadDispatcher.Id)) Я могу с уверенностью предположить, что, когда это утверждение возвращает:

watcher ! "start"

... сообщение "Пуск" уже обработано WatchingActor и, таким образом, я могу сделать утверждения, основанные на watcher.underlyingActor.state

Однако, исходя из моих наблюдений, когда я останавливаюсь watchee с помощью system.stop или отправив Kill к этому Terminated сообщение производится как побочный эффект watchee смерть исполняется асинхронно, в другом потоке.

Не решение - остановить watcheeзаблокировать поток на некоторое время и проверить Watcher после этого, но я хотел бы знать, как мне сделать это правильно (т.е. как быть уверенным, что после убийства актера его наблюдатель получен и обработан Terminated сообщение о смерти)?

2 ответа

Решение

РЕДАКТИРОВАТЬ: После обсуждения и тестирования с OP, мы обнаружили, что отправка PoisonPill в качестве средства завершения наблюдаемого субъекта достигает желаемого поведения, так как завершенные из PPill обрабатываются синхронно, а из остановок или уничтожений - асинхронно.

Хотя мы не уверены в причине, наша лучшая ставка заключается в том, что убийство актера вызывает исключение, а PPilling - нет.

Очевидно, это не имеет ничего общего с использованием шаблона gracefulStop в соответствии с моим первоначальным предложением, о котором я расскажу ниже.

Таким образом, решение проблемы OP состояло в том, чтобы просто завершить отслеживание действующего субъекта, отправляющего PPill вместо сообщения Kill или с помощью system.stop.


Старый ответ начинается здесь:

Возможно, я собираюсь предложить что-то немного не связанное, но я чувствую, что это может применяться.

Если я правильно понял, то, что вы хотите сделать, это в основном синхронно завершить актера, то есть сделать что-то, что возвращается только после того, как актер официально умер, и его смерть была записана (в вашем случае наблюдателем).

В общем, уведомление о смерти, как и большинство других в акке, является асинхронным. Тем не менее, можно получить синхронное подтверждение смерти, используя шаблон gracefulStop (akka.pattern.gracefulStop).

Для этого код должен быть примерно таким:

val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)

Для этого нужно отправить PoisonPill жертве (примечание: вы можете использовать собственное сообщение), которая ответит будущим, которое завершится после смерти жертвы. Используя Await.result вы гарантированно будете синхронны.

К сожалению, это применимо только в том случае, если а) вы активно убиваете жертву и вместо этого не хотите реагировать на внешнюю причину смерти; б) вы можете использовать тайм-ауты и блокировки в своем коде. Но, возможно, вы можете адаптировать эту модель к вашей ситуации.

Одним из способов решения этой проблемы является введение в ваш тест другого наблюдателя, который также наблюдает за watchee, Этот другой наблюдатель является TestProbe что позволит нам выполнить на нем утверждение, которое избавит вас от проблем с синхронизацией, которые вы видите. Во-первых, модифицированный тестовый код:

 val watchee = TestProbe()
 val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
 val probeWatcher = TestProbe()
 probeWatcher watch watchee.ref

 assert(watcher.underlyingActor.state === "initial")

 watcher ! "start" // "start" will be sent and handled by watcher synchronously
 assert(watcher.underlyingActor.state === "start")

 system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
 probeWatcher.expectTerminated(watchee.ref)
 assert(watcher.underlyingActor.state === "terminated")

Итак, вы можете видеть, что я представил дополнительный наблюдатель со строками:

val probeWatcher = TestProbe()
probeWatcher watch watchee.ref

Затем, позже в коде, перед последним утверждением, которое не работает для вас, я использую другое утверждение, которое позволяет мне знать, что Terminated сообщение для остановленного актера было правильно распространено:

probeWatcher.expectTerminated (watchee.ref)

Когда код проходит эту строку, я могу быть уверен, что watcher тестируемое также получило свое прерванное сообщение, и утверждение, которое следует выполнить, пройдет.

РЕДАКТИРОВАТЬ

Как отмечено ФП, в этом коде есть уровень недетерминизма. Другое возможное решение - изменить строку в тестовом коде, которая останавливает актера:

watcher.underlyingActor.context.stop(watchee.ref)

Используя context из TestActorRef Я верю Terminated будет доставлено все через CallingThreadDispatcher и, таким образом, быть полностью синхронным. Я проверил это в цикле, и он работал для меня более 1000 итераций.

Теперь я подумал, что, может быть, потому что я выполнял stop используя тот же актер, который ожидал Terminated что, возможно, была оптимизация для доставки Terminated сам для этого Scanario, поэтому я также проверил это с совершенно другим Actor следующее:

class FooActor extends Actor{
  def receive = {
    case _ =>
  }

Тогда в тестовом коде:

val foo = TestActorRef(new FooActor)

И на остановке:

foo.underlyingActor.context.stop(watchee.ref)

Это также сработало, как и ожидалось.

Другие вопросы по тегам