Акка: тестирование, мониторинг \ наблюдение за смертью
В моем сценарии у меня есть 2 актера:
watchee
(Я используюTestProbe
)watcher
(Watcher
завернутый вTestActorRef
выставить некоторые внутренниеstate
Я отслеживаю в своем тесте)
Наблюдатель должен предпринять некоторые действия, когда watchee
умирает.
Вот полный тестовый пример, который я написал до сих пор:
class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TempTest"))
override def afterAll {
TestKit.shutdownActorSystem(system)
}
class WatcherActor(watchee: ActorRef) extends Actor {
var state = "initial"
context.watch(watchee)
override def receive: Receive = {
case "start" =>
state = "start"
case _: Terminated =>
state = "terminated"
}
}
test("example") {
val watchee = TestProbe()
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
assert(watcher.underlyingActor.state === "initial")
watcher ! "start" // "start" will be sent and handled by watcher synchronously
assert(watcher.underlyingActor.state === "start")
system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
Thread.sleep(100) // what is the best way to avoid blocking here?
assert(watcher.underlyingActor.state === "terminated")
}
}
Теперь, поскольку все задействованные актеры используют CallingThreadDispatcher
(все тестовые помощники Акки создаются с использованием реквизита с .withDispatcher(CallingThreadDispatcher.Id)
) Я могу с уверенностью предположить, что, когда это утверждение возвращает:
watcher ! "start"
... сообщение "Пуск" уже обработано WatchingActor
и, таким образом, я могу сделать утверждения, основанные на watcher.underlyingActor.state
Однако, исходя из моих наблюдений, когда я останавливаюсь watchee
с помощью system.stop
или отправив Kill
к этому Terminated
сообщение производится как побочный эффект watchee
смерть исполняется асинхронно, в другом потоке.
Не решение - остановить watchee
заблокировать поток на некоторое время и проверить Watcher
после этого, но я хотел бы знать, как мне сделать это правильно (т.е. как быть уверенным, что после убийства актера его наблюдатель получен и обработан Terminated
сообщение о смерти)?
2 ответа
РЕДАКТИРОВАТЬ: После обсуждения и тестирования с OP, мы обнаружили, что отправка PoisonPill в качестве средства завершения наблюдаемого субъекта достигает желаемого поведения, так как завершенные из PPill обрабатываются синхронно, а из остановок или уничтожений - асинхронно.
Хотя мы не уверены в причине, наша лучшая ставка заключается в том, что убийство актера вызывает исключение, а PPilling - нет.
Очевидно, это не имеет ничего общего с использованием шаблона gracefulStop в соответствии с моим первоначальным предложением, о котором я расскажу ниже.
Таким образом, решение проблемы OP состояло в том, чтобы просто завершить отслеживание действующего субъекта, отправляющего PPill вместо сообщения Kill или с помощью system.stop.
Старый ответ начинается здесь:
Возможно, я собираюсь предложить что-то немного не связанное, но я чувствую, что это может применяться.
Если я правильно понял, то, что вы хотите сделать, это в основном синхронно завершить актера, то есть сделать что-то, что возвращается только после того, как актер официально умер, и его смерть была записана (в вашем случае наблюдателем).
В общем, уведомление о смерти, как и большинство других в акке, является асинхронным. Тем не менее, можно получить синхронное подтверждение смерти, используя шаблон gracefulStop (akka.pattern.gracefulStop).
Для этого код должен быть примерно таким:
val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)
Для этого нужно отправить PoisonPill жертве (примечание: вы можете использовать собственное сообщение), которая ответит будущим, которое завершится после смерти жертвы. Используя Await.result вы гарантированно будете синхронны.
К сожалению, это применимо только в том случае, если а) вы активно убиваете жертву и вместо этого не хотите реагировать на внешнюю причину смерти; б) вы можете использовать тайм-ауты и блокировки в своем коде. Но, возможно, вы можете адаптировать эту модель к вашей ситуации.
Одним из способов решения этой проблемы является введение в ваш тест другого наблюдателя, который также наблюдает за watchee
, Этот другой наблюдатель является TestProbe
что позволит нам выполнить на нем утверждение, которое избавит вас от проблем с синхронизацией, которые вы видите. Во-первых, модифицированный тестовый код:
val watchee = TestProbe()
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
val probeWatcher = TestProbe()
probeWatcher watch watchee.ref
assert(watcher.underlyingActor.state === "initial")
watcher ! "start" // "start" will be sent and handled by watcher synchronously
assert(watcher.underlyingActor.state === "start")
system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
probeWatcher.expectTerminated(watchee.ref)
assert(watcher.underlyingActor.state === "terminated")
Итак, вы можете видеть, что я представил дополнительный наблюдатель со строками:
val probeWatcher = TestProbe()
probeWatcher watch watchee.ref
Затем, позже в коде, перед последним утверждением, которое не работает для вас, я использую другое утверждение, которое позволяет мне знать, что Terminated
сообщение для остановленного актера было правильно распространено:
probeWatcher.expectTerminated (watchee.ref)
Когда код проходит эту строку, я могу быть уверен, что watcher
тестируемое также получило свое прерванное сообщение, и утверждение, которое следует выполнить, пройдет.
РЕДАКТИРОВАТЬ
Как отмечено ФП, в этом коде есть уровень недетерминизма. Другое возможное решение - изменить строку в тестовом коде, которая останавливает актера:
watcher.underlyingActor.context.stop(watchee.ref)
Используя context
из TestActorRef
Я верю Terminated
будет доставлено все через CallingThreadDispatcher
и, таким образом, быть полностью синхронным. Я проверил это в цикле, и он работал для меня более 1000 итераций.
Теперь я подумал, что, может быть, потому что я выполнял stop
используя тот же актер, который ожидал Terminated
что, возможно, была оптимизация для доставки Terminated
сам для этого Scanario, поэтому я также проверил это с совершенно другим Actor
следующее:
class FooActor extends Actor{
def receive = {
case _ =>
}
Тогда в тестовом коде:
val foo = TestActorRef(new FooActor)
И на остановке:
foo.underlyingActor.context.stop(watchee.ref)
Это также сработало, как и ожидалось.