Читатель журнала Akka.net пропустил события

В нашем приложении мы используем Akka.net, с источником событий. Постоянные участники сохраняют свои события в базе данных SQL Server. У нас также есть акторы представлений, которые подписываются на эти события, используя запрос чтения журнала / постоянства, для создания материализованных представлений. У нас есть таблица в базе данных, в которой есть строка для каждого действующего лица. Эта строка содержит имя актера представления и смещение последнего обработанного события. На первый взгляд, это работает гладко. Однако иногда, когда мы запускаем тест, который приводит к тысячам событий, читатель журнала пропускает некоторые события.

Представителем представления является ReceiveActor. При запуске он извлекает последнее обработанное смещение события из базы данных (вызывается из конструктора актера). Смещение передается в себя в OffsetMessage. При получении сообщения OffsetMessage субъект представления инициализирует программу чтения журнала. При получении событий (в сообщениях EventEnvelope) представления обновляются.

Действие, которое запускается из программы чтения журнала, сначала записывает строку в журнал. Эта строка содержит смещение события. Обработчик получения EventEnvelope также записывает строку в журнал. Эта строка также содержит смещение события.

У нас есть тест, в результате которого в журнал вставляется событие 9635. Иногда программа чтения журнала и обработчик получения EventEnvelope регистрируют менее 9635 событий. Они оба записывают одинаковые номера, поэтому читатель журнала пропускает события. Пропущенные события из журнала соответствуют отсутствующим элементам в представлениях. Мы запускаем тест на пустой базе данных. Ведение журнала находится на уровне отладки и не показывает исключений. Пропущенные события (мы видели числа от 1 до 4) могут быть среди первых, средних или последних событий. Каждый раз это по-другому.

Пока что мы не знаем, что является причиной этой проблемы или как ее можно решить.

Ниже приведены фрагменты нашего кода. Все акторы представления наследуются от базового класса: ViewActorBase.

internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
    public ViewActorBase()
    {
        // Some initialisation code
        ....

        this.Receive<OffsetMessage>(this.HandleOffsetMessage);
        this.ReceiveAsync<EventEnvelope>(this.UpdateState);

        var sender = this.Sender;
        var self = this.Self;
        this.GetViewActorOffset(self, sender);
    }

    private void HandleOffsetMessage(OffsetMessage offsetMessage)
    {
        this.InitialiseJournalReader(offsetMessage.Offset);
    }

    private void InitialiseJournalReader(long offset)
    {
        // obtain read journal by plugin id
        var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");

        // materialize stream, consuming events
        var materializer = ActorMaterializer.Create(Context.System);

        // issue query to journal
        Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));

        var self = this.Self;
        source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
    }

    private void GetViewActorOffset(IActorRef self, IActorRef sender)
    {
        // Initialise repository
        ....

        repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
    }
}

internal class MyViewActor : ViewActorBase
{
    protected override async Task UpdateState(EventEnvelope envelope)
    {
        var offset = (envelope.Offset as Sequence).Value;

        this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);

        // Update views
        ....
    }
}

Что-то не так в нашем коде или архитектуре? Есть ли лучшие решения?

Дополнительная информация Мы провели несколько тестов с SQL Server Profiler, отслеживая запросы к базе данных.

В журнале событий был выполнен запрос, запрашивающий 100 событий, начиная со смещения 204743. Результат содержал 61 строку.

<Event id="10" name="RPC:Completed">
  <Column id="1" name="TextData">exec sp_executesql N'
        SELECT TOP (@Take)
        e.PersistenceId as PersistenceId, 
        e.SequenceNr as SequenceNr, 
        e.Timestamp as Timestamp, 
        e.IsDeleted as IsDeleted, 
        e.Manifest as Manifest, 
        e.Payload as Payload,
        e.SerializerId as SerializerId,
        e.Ordering as Ordering
        FROM dbo.EventJournal e
        WHERE e.Ordering &gt; @Ordering AND e.Tags LIKE @Tag
        ORDER BY Ordering ASC
        ',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
  <Column id="9" name="ClientProcessID">1169425116</Column>
  <Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
  <Column id="12" name="SPID">82</Column>
  <Column id="13" name="Duration">353890</Column>
  <Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
  <Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
  <Column id="16" name="Reads">326</Column>
  <Column id="17" name="Writes">0</Column>
  <Column id="18" name="CPU">0</Column>
  <Column id="48" name="RowCounts">61</Column>
</Event>

Мы ожидали, что следующий запрос начнется с 204804 (204743 + 61). Тем не менее, он начался в 204810. Почему он пропускает (или пропускает) 6 событий?

0 ответов

Другие вопросы по тегам