Почему Akka.Persistance не воспроизводит записи моего журнала

Я пишу реализацию Akka.Persistence for Service Fabric, и мне кажется, что я не могу заставить работать моментальный снимок. Когда он пытается восстановить состояние, он получает последний снимок, но не воспроизводит события с момента последнего снимка. Мне не ясно, правильно ли я подключил компоненты или если моя реализация библиотеки постоянства неверна. Мой актер - простой счетчик, мое состояние - просто текущий счетчик. Я ожидаю, что сначала будет вызвано восстановление, а затем восстановление будет вызываться для каждой записи журнала между последним снимком и наивысшим порядковым номером. В журнале есть функция ReplayMessagesAsync (...), которая выглядит так, как будто должна делать это, но не вызывается. Код для моего счетчика ниже, остальная часть моего кода: Код

using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AkkaPersistence.Actors
{
    public class Counter : ReceivePersistentActor
    {
        public class GetCount { }

        private int counter;

        private CounterState State = new CounterState();

        private int _msgsSinceLastSnapshot = 0;

        public Counter()
        {
            Recover<Evt>(evt =>
            {
                State.Update(evt);
            });

            Recover<SnapshotOffer>(offer => {
                var snapshotEntry = offer.Snapshot as SnapshotEntry;
                if (snapshotEntry != null)
                {
                    State = (CounterState)snapshotEntry.Snapshot;
                }
            });

            Command<string>(str => Persist(str, s =>
            {
                ++counter;
                var evt = new Evt(s);
                State.Update(evt);

                if (++_msgsSinceLastSnapshot % 10 == 0)
                {
                    //time to save a snapshot
                    SaveSnapshot(State.Copy());
                }
            }));

            Command<GetCount>(get => Sender.Tell(State.Count));

            Command<SaveSnapshotSuccess>(success =>
            {
                ServiceEventSource.Current.Message($"Saved snapshot");
                DeleteMessages(success.Metadata.SequenceNr);
            });

            Command<SaveSnapshotFailure>(failure => {
                // handle snapshot save failure...
                ServiceEventSource.Current.Message($"Snapshot failure");
            });

        }

        public override string PersistenceId
        {
            get
            {
                return "counter";
            }
        }
    }

    internal class CounterState
    {
        private long count = 0L;

        public long Count
        {
            get { return count; }
            set { count = value; }
        }

        public CounterState(long count)
        {
            this.Count = count;
        }

        public CounterState() : this(0)
        {
        }

        public CounterState Copy()
        {
            return new CounterState(count);
        }

        public void Update(Evt evt)
        {
            ++Count;
        }
    }

    public class Evt
    {
        public Evt(string data)
        {
            Data = data;
        }

        public string Data { get; }

    }

    public class Cmd
    {
        public Cmd(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

}

1 ответ

Решение

Было несколько вещей, которые я ошибался: 1) мне нужно было возвращать то, что было передано, а не мой SnapshotEntry, который является деталью реализации моего механизма персистентности. 2) Простая ошибка, когда я перешел от сохранения строк к попытке сохранить объекты как часть журнала. 3) Наконец, была еще одна проблема, которая лежала в основе, и заключалась в том, что сериализация не удалась с дочерними объектами. В этом фрагменте кода я не хотел включать тип дочернего объекта, поэтому вместо этого я добавил настраиваемый сериализатор ( сериализатор Wire) для журнала, а также уже существующий SnapshotSerializer, и он теперь работает.

Другие вопросы по тегам