Почему у Akka.net Streams низкая производительность

Итак, я прочитал 7 тем, связанных с производительностью потоков AKKA, но я все еще публикую этот вопрос, чтобы подтвердить свои выводы (что, выбирая использование потоков, вы, вероятно, получите снижение производительности в определенных ситуациях). [на этот поток, похоже, есть ссылка хотя бы один раз: https://stackoverflow.com/questions/33416891/akka-stream-implementation-slower-than-single-threaded-implementation]

Насколько я понимаю, это отчасти связано с продолжающимися накладными расходами на передачу сообщений между нисходящими компонентами и источником потока.

В моем случае я выполняю запрос сохранения состояния akka.net (по тегу) и выгружаю его непосредственно в приемник акторов. Модель чрезвычайно проста, и я думаю, что обработка сообщений в приемнике акторов будет чрезвычайно быстрой.

Я провел некоторое тестирование с помощью консольного приложения, и тестовое приложение обрабатывает примерно 25 сообщений в секунду. Когда одни и те же события используются для гидратации актера, который их создал, этот актер может обработать все свои события (~45000 из них) почти мгновенно.

Это несоответствие меня смущает. При «восстановлении» постоянного состояния актора система акторов .net может обрабатывать все 45000 сообщений почти мгновенно, но при запуске сохраненных событий в виде потока к другому актору (почти дубликат исходного постоянного актора счетчика) она получает только / обработка 25 сообщений каждую секунду. Может ли кто-нибудь просмотреть мой код ниже, чтобы убедиться, что я правильно реализую примеры, и дать мне какие-либо отзывы о том, почему так мучительно низкая производительность с запросом на сохранение, который я пытаюсь выполнить?

Журнал событий представляет собой локальную базу данных sqlite.

      // --STREAM CODE-- CALLED FROM A TESTING METHOD OUTSIDE OF COUNTER ACTOR
// QUERY IS ACCESSED BY CALLING A METHOD: START-QUERIES
private static void StartQueries()
{
    // USING SELECTASYNC & ASK
    readJournal.EventsByTag("CounterIncrementedEvent", Offset.Sequence(0L))
        .SelectAsync(5, envelope => nonPersistentCounterActor.Ask(envelope, TimeSpan.FromSeconds(3)))
        .RunWith(Sink.Ignore<object>(), mat);


    // USING ACTOR REF WITH ACK
    readJournal.EventsByTag("CounterIncrementedEvent", Offset.NoOffset())
        .RunWith(
                    Sink.ActorRefWithAck<EventEnvelope>(nonPersistentCounterActor,
                                            new CounterMessages.StreamInitialized(),
                                            CounterMessages.Ack.INSTANCE,
                                            new CounterMessages.StreamCompleteMessage(),
                                            ex => new CounterMessages.StreamFailureEvent(ex)
                                            ),
                    mat
                );
}


// --COUNTER ACTOR--
//THIS BEHAVIOR METHOD IS RUN IN COUNTER ACTOR'S CONSTRUCTOR
//THE --APPLY-- METHODS ONLY -- OR ++ OR / THE int COUNTER
private void Initalize()
{

    Receive<CounterMessages.StreamInitialized>(msg =>
    {
        Sender.Tell(CounterMessages.Ack.INSTANCE);
    });

    Receive<EventEnvelope>(env =>
    {
        switch (env.Event)
        {
            case CounterMessages.CounterIncrementedEvent ev:
                Apply(ev);
                break;
            case CounterMessages.CounterDecrementedEvent ev:
                Apply(ev);
                break;
            case CounterMessages.CounterResetEvent ev:
                Apply(ev);
                break;
            case CounterMessages.CounterDividedEvent ev:
                Apply(ev);
                break;
        }

        // tick is just used for testing to force a printout to the console
        tick++;
        //acknowledge stream message
        Sender.Tell(CounterMessages.Ack.INSTANCE, Self);
        // set offset to allow for a resumable projection later
        latestOffset = env.Offset;

        if(tick >= 100)
        {
            Console.WriteLine($@"=================>  MY NON-PERSISTENT COUNTER STATE IS CURRENTLY: {counter.ToString()}      at {System.DateTime.UtcNow}");
            Console.WriteLine($@"=================>  MY OFFSET IS NOW:    {latestOffset}");
            Console.WriteLine();
            Console.WriteLine();

            tick = 0;
        }
    });
}

Опять же, моя интерпретация заключается в том, что потоки действительно медленные в определенных контекстах, но большая разница между регидрацией актера (мгновенной) и обработкой потока (25 сообщений в секунду) кажется мне немного странной. Спасибо, что посмотрели и предложили свой опыт!

0 ответов

Другие вопросы по тегам