Почему у Akka.net Streams низкая производительность
Итак, я прочитал 7 тем, связанных с производительностью потоков AKKA, но я все еще публикую этот вопрос, чтобы подтвердить свои выводы (что, выбирая использование потоков, вы, вероятно, получите снижение производительности в определенных ситуациях). [на этот поток, похоже, есть ссылка хотя бы один раз: https://stackoverflow.com/questions/33416891/akka-stream-implementation-slower-than-single-threaded-implementation]
Насколько я понимаю, это отчасти связано с продолжающимися накладными расходами на передачу сообщений между нисходящими компонентами и источником потока.
В моем случае я выполняю запрос сохранения состояния akka.net (по тегу) и выгружаю его непосредственно в приемник акторов. Модель чрезвычайно проста, и я думаю, что обработка сообщений в приемнике акторов будет чрезвычайно быстрой.
Я провел некоторое тестирование с помощью консольного приложения, и тестовое приложение обрабатывает примерно 25 сообщений в секунду. Когда одни и те же события используются для гидратации актера, который их создал, этот актер может обработать все свои события (~45000 из них) почти мгновенно.
Это несоответствие меня смущает. При «восстановлении» постоянного состояния актора система акторов .net может обрабатывать все 45000 сообщений почти мгновенно, но при запуске сохраненных событий в виде потока к другому актору (почти дубликат исходного постоянного актора счетчика) она получает только / обработка 25 сообщений каждую секунду. Может ли кто-нибудь просмотреть мой код ниже, чтобы убедиться, что я правильно реализую примеры, и дать мне какие-либо отзывы о том, почему так мучительно низкая производительность с запросом на сохранение, который я пытаюсь выполнить?
Журнал событий представляет собой локальную базу данных sqlite.
// --STREAM CODE-- CALLED FROM A TESTING METHOD OUTSIDE OF COUNTER ACTOR
// QUERY IS ACCESSED BY CALLING A METHOD: START-QUERIES
private static void StartQueries()
{
// USING SELECTASYNC & ASK
readJournal.EventsByTag("CounterIncrementedEvent", Offset.Sequence(0L))
.SelectAsync(5, envelope => nonPersistentCounterActor.Ask(envelope, TimeSpan.FromSeconds(3)))
.RunWith(Sink.Ignore<object>(), mat);
// USING ACTOR REF WITH ACK
readJournal.EventsByTag("CounterIncrementedEvent", Offset.NoOffset())
.RunWith(
Sink.ActorRefWithAck<EventEnvelope>(nonPersistentCounterActor,
new CounterMessages.StreamInitialized(),
CounterMessages.Ack.INSTANCE,
new CounterMessages.StreamCompleteMessage(),
ex => new CounterMessages.StreamFailureEvent(ex)
),
mat
);
}
// --COUNTER ACTOR--
//THIS BEHAVIOR METHOD IS RUN IN COUNTER ACTOR'S CONSTRUCTOR
//THE --APPLY-- METHODS ONLY -- OR ++ OR / THE int COUNTER
private void Initalize()
{
Receive<CounterMessages.StreamInitialized>(msg =>
{
Sender.Tell(CounterMessages.Ack.INSTANCE);
});
Receive<EventEnvelope>(env =>
{
switch (env.Event)
{
case CounterMessages.CounterIncrementedEvent ev:
Apply(ev);
break;
case CounterMessages.CounterDecrementedEvent ev:
Apply(ev);
break;
case CounterMessages.CounterResetEvent ev:
Apply(ev);
break;
case CounterMessages.CounterDividedEvent ev:
Apply(ev);
break;
}
// tick is just used for testing to force a printout to the console
tick++;
//acknowledge stream message
Sender.Tell(CounterMessages.Ack.INSTANCE, Self);
// set offset to allow for a resumable projection later
latestOffset = env.Offset;
if(tick >= 100)
{
Console.WriteLine($@"=================> MY NON-PERSISTENT COUNTER STATE IS CURRENTLY: {counter.ToString()} at {System.DateTime.UtcNow}");
Console.WriteLine($@"=================> MY OFFSET IS NOW: {latestOffset}");
Console.WriteLine();
Console.WriteLine();
tick = 0;
}
});
}
Опять же, моя интерпретация заключается в том, что потоки действительно медленные в определенных контекстах, но большая разница между регидрацией актера (мгновенной) и обработкой потока (25 сообщений в секунду) кажется мне немного странной. Спасибо, что посмотрели и предложили свой опыт!