Акка Упорство Query и актерский осколок
Я делаю запрос стороны приложения актеров CQRS Akka.
Актеры запросов настроены как кластерный осколок и заполнены событиями из одного потока постоянных запросов.
Мои вопросы:
Если один из актеров в кластере перезапускается, как его восстановить?
- Выключить весь осколок кластера и ответить на все события?
- Сделать акторов в кластере осколками постоянных актеров и сохранить новый набор событий только для стороны запроса?
Если субъект, который является наполнителем с помощью Persistence Query, перезапускается, как я могу отменить текущий PQ и запустить его снова?
1 ответ
Как уже говорилось, я бы оценил сохранение вашей стороны запроса в базе данных.
Если это не вариант, и вы хотите придерживаться одного запроса на постоянство для каждого сегмента, выполните следующие действия в субъекте запросов:
var inRecovery: Boolean = true;
override def preStart( ) = {
//Subscribe to your event live stream now, so you don't miss anything during recovery
// e.g. send Subscription message to your persistence query actor
//Re-Read everything up to now for recovery
readJournal.currentEventsByPersistenceId("persistenceId")
.watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
.map(Replay.apply) // Mark your replay messages
.runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}
override def receive = {
case Done => // Recovery is finished
inRecovery = false
unstashAll() // unstash all normal messages received during recovery
case Replay( payload ) =>
//handle replayed messages
case events: Event =>
//handle normal events from your persistence query
inRecovery match {
case true => stash() // stash normal messages until recovery is done
case false =>
// recovery is done, start handling normal events
}
}
case class Replay( payload: AnyRef )
Таким образом, в основном, перед тем, как субъект начинает подписываться на субъект запроса постоянства и восстанавливает состояние с помощью конечного потока всех прошлых событий, который завершается после того, как все события пройдены. Во время восстановления спрятать все входящие события, которые не воспроизводятся события. Затем, после восстановления, распакуйте все и начните обрабатывать обычные сообщения.