События не восстанавливаются в Akka 2.4.0 Persistence & Cassandra Journal Plugin 0.6
Я пытаюсь написать приложение, использующее Akka (версия 2.4.0) Persistency и плагин Cassandra (версия 0.6, https://github.com/krasserm/akka-persistence-cassandra) для восстановления после сбоев. События сохраняются на Кассандре без проблем, однако, я пытаюсь убить одного из них, и актер перезапускает его, события не принимаются receiveRecover
,
Кажется, что проблема связана с самим плагином, как будто я использую совместно используемую LevelDB вместо cassandra, события принимаются на этапе восстановления.
Вот реализация моего настойчивого актера:
class SimplePersistentActor extends PersistentActor with ActorLogging {
def persistenceId: String = context.self.path.name
override def preRestart(cause: Throwable, msg: Option[Any]) = {
log.debug(s"Restarting ${getClass.getSimpleName}")
super.preRestart(cause, msg)
}
override def postStop() = {
log.debug(s"Stopping ${getClass.getSimpleName}")
super.postStop()
}
var transactionData: Either[UninitializedData, RunningTransactionData] = Left(UninitializedData())
def receiveCommand ={
case msg @ TransactionStart(transactionId) =>
persist(msg) { _ => }
log.debug(s"Starting a transaction with id $transactionId")
transactionData = Right(RunningTransactionData(transactionId, List()))
/* Send a reply */
sender() ! transactionId
case msg @ TransactionData(data) =>
persist(msg) { _ => }
transactionData match {
case Right(t: RunningTransactionData) =>
val updatedTransaction = t.copy(data = t.data ::: List(data))
log.debug(s"There are ${updatedTransaction.data.size} data items within a transaction ${t.transactionId}")
transactionData = Right(updatedTransaction)
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case TransactionEnd(transactionId) =>
transactionData match {
case Right(t: RunningTransactionData) =>
log.debug(s"Ending a transaction with id ${t.transactionId}")
transactionData = Left(UninitializedData())
/* Send a reply */
sender() ! t.transactionId
case _ => log.error("Actor's transaction data is not initialized")
}
case other =>
log.debug(s"Unexpected event received: $other")
}
def receiveRecover = {
case message =>
log.debug(s"Recovery Step. Message $message received")
}
}
В обоих случаях, которые я описал выше, код не меняется. Кто-нибудь видел эту проблему раньше?
1 ответ
Я столкнулся с тем же и нашел исправление, которое работает для меня. Я знаю, что ваш вопрос довольно старый, но так как я видел ту же проблему в последней версии Akka Cassandra Persistence (0.86), я подумал, что стоит упомянуть.
Проблема у меня возникла из следующего конфига.
cassandra-main-journal = ${cassandra-journal} {
contact-points = ["localhost"]
keyspace-autocreate = true
tables-autocreate = true
keyspace = "main_akka_journal"
}
Итак, по умолчанию cassandra-journal
настроить и переопределить keyspace
, Затем, как вы делаете, переопределение persistenceId
в постоянном актере Akka указать на этот конфиг.
Что произойдет, если вы сделаете это, все пишет актеру перейти на main_akka_journal
пространство ключей. При перезапуске Актера вы получаете RecoveryCompleted
сообщение, но вы не видите ни одного из написанных вами сообщений. Однако, когда вы получаете RecoveryCompleted
lastSequenceNr
будет правильно.
Что интересно, если у вас есть keyspace-autocreate=true
вы увидите два пространства клавиш, где созданы. main_akka_journal
а также akka
,
Так что проблема в том, что постоянный актер пишет в main_akka_journal
клавиш, при перезапуске он читает события из akka
пространство клавиш (которое пусто), а затем читать lastSequenceNr
от main_akka_journal
пространство клавиш (что правильно).
Решением для меня был этот конфиг:
cassandra-main-journal = ${cassandra-journal} {
contact-points = ["localhost"]
keyspace-autocreate = true
tables-autocreate = true
keyspace = "main_akka_journal"
query-plugin = "cassandra-main-query-plugin"
}
cassandra-main-query-plugin = ${cassandra-query-journal} {
write-plugin = "cassandra-main-journal"
}
В противном случае по умолчанию write-plugin
указывает на cassandra-journal
и akka
пространство ключей.