Почему Akka.Persistance не воспроизводит записи моего журнала
Я пишу реализацию Akka.Persistence for Service Fabric, и мне кажется, что я не могу заставить работать моментальный снимок. Когда он пытается восстановить состояние, он получает последний снимок, но не воспроизводит события с момента последнего снимка. Мне не ясно, правильно ли я подключил компоненты или если моя реализация библиотеки постоянства неверна. Мой актер - простой счетчик, мое состояние - просто текущий счетчик. Я ожидаю, что сначала будет вызвано восстановление, а затем восстановление будет вызываться для каждой записи журнала между последним снимком и наивысшим порядковым номером. В журнале есть функция ReplayMessagesAsync (...), которая выглядит так, как будто должна делать это, но не вызывается. Код для моего счетчика ниже, остальная часть моего кода: Код
using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AkkaPersistence.Actors
{
public class Counter : ReceivePersistentActor
{
public class GetCount { }
private int counter;
private CounterState State = new CounterState();
private int _msgsSinceLastSnapshot = 0;
public Counter()
{
Recover<Evt>(evt =>
{
State.Update(evt);
});
Recover<SnapshotOffer>(offer => {
var snapshotEntry = offer.Snapshot as SnapshotEntry;
if (snapshotEntry != null)
{
State = (CounterState)snapshotEntry.Snapshot;
}
});
Command<string>(str => Persist(str, s =>
{
++counter;
var evt = new Evt(s);
State.Update(evt);
if (++_msgsSinceLastSnapshot % 10 == 0)
{
//time to save a snapshot
SaveSnapshot(State.Copy());
}
}));
Command<GetCount>(get => Sender.Tell(State.Count));
Command<SaveSnapshotSuccess>(success =>
{
ServiceEventSource.Current.Message($"Saved snapshot");
DeleteMessages(success.Metadata.SequenceNr);
});
Command<SaveSnapshotFailure>(failure => {
// handle snapshot save failure...
ServiceEventSource.Current.Message($"Snapshot failure");
});
}
public override string PersistenceId
{
get
{
return "counter";
}
}
}
internal class CounterState
{
private long count = 0L;
public long Count
{
get { return count; }
set { count = value; }
}
public CounterState(long count)
{
this.Count = count;
}
public CounterState() : this(0)
{
}
public CounterState Copy()
{
return new CounterState(count);
}
public void Update(Evt evt)
{
++Count;
}
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
}
1 ответ
Было несколько вещей, которые я ошибался: 1) мне нужно было возвращать то, что было передано, а не мой SnapshotEntry, который является деталью реализации моего механизма персистентности. 2) Простая ошибка, когда я перешел от сохранения строк к попытке сохранить объекты как часть журнала. 3) Наконец, была еще одна проблема, которая лежала в основе, и заключалась в том, что сериализация не удалась с дочерними объектами. В этом фрагменте кода я не хотел включать тип дочернего объекта, поэтому вместо этого я добавил настраиваемый сериализатор ( сериализатор Wire) для журнала, а также уже существующий SnapshotSerializer, и он теперь работает.