События не восстанавливаются в Akka 2.4.0 Persistence & Cassandra Journal Plugin 0.6

Я пытаюсь написать приложение, использующее Akka (версия 2.4.0) Persistency и плагин Cassandra (версия 0.6, https://github.com/krasserm/akka-persistence-cassandra) для восстановления после сбоев. События сохраняются на Кассандре без проблем, однако, я пытаюсь убить одного из них, и актер перезапускает его, события не принимаются receiveRecover,

Кажется, что проблема связана с самим плагином, как будто я использую совместно используемую LevelDB вместо cassandra, события принимаются на этапе восстановления.

Вот реализация моего настойчивого актера:



    class SimplePersistentActor extends PersistentActor with ActorLogging {

      def persistenceId: String = context.self.path.name

      override def preRestart(cause: Throwable, msg: Option[Any]) = {
        log.debug(s"Restarting ${getClass.getSimpleName}")
        super.preRestart(cause, msg)
      }

      override def postStop() = {
        log.debug(s"Stopping ${getClass.getSimpleName}")
        super.postStop()
      }

      var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())

      def receiveCommand ={
        case msg @ TransactionStart(transactionId) =>
          persist(msg) { _ => }
          log.debug(s"Starting a transaction with id $transactionId")
          transactionData = Right(RunningTransactionData(transactionId, List()))

          /* Send a reply */
          sender() ! transactionId

        case msg @ TransactionData(data) =>
          persist(msg) { _ => }

          transactionData match {
            case Right(t: RunningTransactionData) =>
              val updatedTransaction = t.copy(data = t.data ::: List(data))
              log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
              transactionData = Right(updatedTransaction)

              /* Send a reply */
              sender() ! t.transactionId          

            case _ => log.error("Actor's transaction data is not initialized")
          }

        case TransactionEnd(transactionId) =>
          transactionData match {
            case Right(t: RunningTransactionData) =>
              log.debug(s"Ending a transaction with id ${t.transactionId}")
              transactionData = Left(UninitializedData())

              /* Send a reply */
              sender() ! t.transactionId

            case _ => log.error("Actor's transaction data is not initialized")
          }      

        case other =>
          log.debug(s"Unexpected event received: $other")
      }

      def receiveRecover = {
        case message =>
          log.debug(s"Recovery Step. Message $message received")
      }
    }

В обоих случаях, которые я описал выше, код не меняется. Кто-нибудь видел эту проблему раньше?

1 ответ

Я столкнулся с тем же и нашел исправление, которое работает для меня. Я знаю, что ваш вопрос довольно старый, но так как я видел ту же проблему в последней версии Akka Cassandra Persistence (0.86), я подумал, что стоит упомянуть.

Проблема у меня возникла из следующего конфига.

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
}

Итак, по умолчанию cassandra-journal настроить и переопределить keyspace, Затем, как вы делаете, переопределение persistenceId в постоянном актере Akka указать на этот конфиг.

Что произойдет, если вы сделаете это, все пишет актеру перейти на main_akka_journal пространство ключей. При перезапуске Актера вы получаете RecoveryCompleted сообщение, но вы не видите ни одного из написанных вами сообщений. Однако, когда вы получаете RecoveryCompleted lastSequenceNr будет правильно.

Что интересно, если у вас есть keyspace-autocreate=true вы увидите два пространства клавиш, где созданы. main_akka_journal а также akka,

Так что проблема в том, что постоянный актер пишет в main_akka_journal клавиш, при перезапуске он читает события из akka пространство клавиш (которое пусто), а затем читать lastSequenceNr от main_akka_journal пространство клавиш (что правильно).

Решением для меня был этот конфиг:

cassandra-main-journal = ${cassandra-journal} {
  contact-points = ["localhost"]
  keyspace-autocreate = true
  tables-autocreate = true
  keyspace = "main_akka_journal"
  query-plugin = "cassandra-main-query-plugin"
}

cassandra-main-query-plugin = ${cassandra-query-journal} {
  write-plugin = "cassandra-main-journal"
}

В противном случае по умолчанию write-plugin указывает на cassandra-journal и akka пространство ключей.

Другие вопросы по тегам