Акка Упорство Query и актерский осколок

Я делаю запрос стороны приложения актеров CQRS Akka.

Актеры запросов настроены как кластерный осколок и заполнены событиями из одного потока постоянных запросов.

Мои вопросы:

  1. Если один из актеров в кластере перезапускается, как его восстановить?

    • Выключить весь осколок кластера и ответить на все события?
    • Сделать акторов в кластере осколками постоянных актеров и сохранить новый набор событий только для стороны запроса?
  2. Если субъект, который является наполнителем с помощью Persistence Query, перезапускается, как я могу отменить текущий PQ и запустить его снова?

1 ответ

Решение

Как уже говорилось, я бы оценил сохранение вашей стороны запроса в базе данных.

Если это не вариант, и вы хотите придерживаться одного запроса на постоянство для каждого сегмента, выполните следующие действия в субъекте запросов:

var inRecovery: Boolean = true;

override def preStart( ) = {
    //Subscribe to your event live stream now, so you don't miss anything during recovery
    // e.g. send Subscription message to your persistence query actor

    //Re-Read everything up to now for recovery
    readJournal.currentEventsByPersistenceId("persistenceId")
        .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
        .map(Replay.apply) // Mark your replay messages
        .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
}

override def receive = {
    case Done => // Recovery is finished
        inRecovery = false
        unstashAll() // unstash all normal messages received during recovery

    case Replay( payload ) =>
        //handle replayed messages

    case events: Event =>
        //handle normal events from your persistence query
        inRecovery match {
            case true => stash() // stash normal messages until recovery is done
            case false => 
                // recovery is done, start handling normal events
        }
}


case class Replay( payload: AnyRef )

Таким образом, в основном, перед тем, как субъект начинает подписываться на субъект запроса постоянства и восстанавливает состояние с помощью конечного потока всех прошлых событий, который завершается после того, как все события пройдены. Во время восстановления спрятать все входящие события, которые не воспроизводятся события. Затем, после восстановления, распакуйте все и начните обрабатывать обычные сообщения.

Другие вопросы по тегам