Оптимизация производительности агентов F# Akka.NET при синхронизации
Я пытаюсь решить следующую проблему. У меня есть некоторые агенты, работающие в режиме реального времени, с большим пульсом в пару миллисекунд, и порядок операций, которые они обрабатывают по этой причине, в основном детерминирован (поскольку обработка сообщений не является узким местом).
Сейчас я выполняю большое количество симуляций системы, в которых у меня больше нет сердцебиения (в противном случае это займет пару столетий), но мне нужно убедиться, что порядок операций сохранен. Для этого я принял следующее решение: симулятор проверяет, что каждый агент обработал свою очередь сообщений, отправляя фиктивное сообщение о синхронизации и блокируя его в ожидании ответа. Это работает для моего приложения, но время, которое требуется, не является интуитивно понятным - поскольку однопотоковая реализация будет на порядок быстрее (я думаю - x 100 ish - хотя я не проверял).
Я выделил небольшой тест, который показывает проблему, даже пытаясь использовать другую библиотеку, akka.net
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello2 -> sender.Tell(true)
| _ -> ()
return! loop()
}
loop()
let greeterF =
MailboxProcessor.Start
(fun inbox ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello reply -> reply.Reply true
| _ -> ()
}
)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
()
printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds
t1.Restart()
for i = 1 to n do
let res = greeter.Ask (Hello2)
let rr = res.Result
()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
По сути, обе из них занимают около 10 секунд всего за 1 миллион синхронизаций, а не вычислений, которые когда-либо были задействованы, и это... неудачно.
Мне интересно, сталкивался ли кто-нибудь с такой же проблемой, и есть ли возможность отключить служебную нагрузку, заставляя все работать в однопоточном режиме... что-то вроде лучше, чем деактивация всех процессоров, кроме 1 в BIOS - или написание клона всей системы без агентов.
Любая помощь приветствуется.
3 ответа
Причина медленной версии Akka.NET заключается в том, как вы общаетесь с актером:
main process Task FutureActorRef !!ThreadPool!! greeter
Ask ---------------------->
Tell----------->
MailboxRun ----->
(greeter mailbox is empty) |
<--------------------------Tell
<--Complete task
<----------.Result
Для каждой итерации будет создана задача TPL
Затем единственное сообщение отправляется встречающему.
Затем основной процесс блокируется в ожидании ответа.
Ответивший отвечает, что, в свою очередь, завершает задачу внутри
FutureActorRef
Ополаскивание и повторение. Этот дизайн заставит Akka.NET запускать и останавливать "запуск почтового ящика" для каждого сообщения, поскольку очередь почтовых ящиков становится пустой для каждой итерации. Это приводит к планированию пула потоков для каждого передаваемого сообщения.
Это немного похоже на то, как когда вы садитесь в машину, нажимаете педаль до упора, затем резко останавливаетесь и выходите из машины, а затем снова повторяете процедуру. Это просто не очень эффективный способ быстро путешествовать.
Предложение @Aaronontheweb вступит в силу только в том случае, если вы решите вышеуказанные проблемы в своем коде. Почтовый ящик должен иметь возможность постоянно выбирать сообщения из внутренней очереди для работы с сообщениями в пакетном режиме для достижения полной пропускной способности.
Вместо этого отделите производителя от потребителя. Создайте актера, который слушает ответы вашего приветствующего. И как только этот субъект обработает ваши 1000000 сообщений, пусть этот субъект отправит сообщение WorkCompleted обратно потребителю.
[Править] Я сам попробовал, я не знаю F#, так что это может быть не совсем идиоматично:)
open Akka
open Akka.Actor
open Akka.FSharp
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2
type Consume =
| Response
| SetSender
[<EntryPoint>]
let main argv =
let system = System.create "MySystem" <| Configuration.load()
let greeter = spawn system "greeter" <| fun mailbox ->
let rec loop() = actor {
let! msg = mailbox.Receive()
let sender = mailbox.Sender()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello2 -> sender.Tell(Response)
| _ -> ()
return! loop()
}
loop()
let consumer = spawn system "consumer" <| fun mailbox ->
let rec loop(count,sender : IActorRef) = actor {
if count = 1000000 then sender.Tell(true)
let! msg = mailbox.Receive()
match msg with
| Response -> return! loop(count+1,sender)
| SetSender -> return! loop(count,mailbox.Sender())
}
loop(0,null)
let n = 1000000
let t1 = System.Diagnostics.Stopwatch()
t1.Start()
for i = 1 to n do
greeter.Tell(Hello2,consumer)
let workdone = consumer.Ask SetSender
workdone.Wait()
printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
System.Console.ReadLine () |> ignore
0
Я обновил ваш код, чтобы использовать отдельного потребителя для ответов актера, а затем отвечать обратно после обработки всех ответов.
Благодаря этому ваше время обработки на моей машине сократилось до 650 мс.
Если вы хотите повысить пропускную способность, вам нужно привлечь больше актеров для параллелизации.
Я не уверен, поможет ли это в вашем конкретном сценарии
Вот слегка модифицированный MailboxProcessor
версия:
module MBPAsync =
type Greet =
| Greet of string
| Hello of AsyncReplyChannel<bool>
let run n =
let timer = Stopwatch.StartNew ()
use greeter =
MailboxProcessor.Start <| fun inbox -> async {
while true do
let! msg = inbox.Receive()
match msg with
| Greet who -> () // printf "Hello, %s!\n" who
| Hello reply -> reply.Reply true
}
Async.RunSynchronously <| async {
for i = 1 to n do
do! Async.Ignore (greeter.PostAndAsyncReply Hello)
}
let elapsed = timer.Elapsed
printfn "%A" elapsed
Разница здесь в том, что эта версия использует PostAndAsyncReply
и сохраняет вычисления в асинхронном рабочем процессе. На моем быстром тесте это было намного быстрее, чем при использовании PostAndReply
, но YMMV.
Время, которое я получаю из вышеуказанной версии MBP, выглядит примерно так:
> MBPAsync.run 1000000 ;;
00:00:02.6883486
val it : unit = ()
В комментарии ранее упоминалась моя библиотека Hopac. Вот оптимизированная версия с использованием Hopac:
module Hop =
type Greet =
| Greet of string
| Hello of IVar<bool>
let run n =
let timer = Stopwatch.StartNew ()
let greeterCh = ch ()
do greeterCh >>= function
| Greet who -> Job.unit ()
| Hello reply -> reply <-= true
|> Job.forever
|> server
Job.forUpToIgnore 1 n <| fun _ ->
let reply = ivar ()
greeterCh <-- Hello reply >>.
reply
|> run
let elapsed = timer.Elapsed
printfn "%A" elapsed
Время, которое я получаю из вышеупомянутой версии Hopac, выглядит примерно так:
> Hop.run 1000000 ;;
00:00:00.1088768
val it : unit = ()
Я не F# разработчик, но я главный разработчик на Akka.NET. Пара идей для вашего сценария:
Если вы используете только одного актера для этой работы, вы можете попробовать использовать
PinnedDispatcher
- таким образом, актер все время работает в своем собственном потоке. Это избавит вас от ненужных переключений контекста.Вы также можете установить пропускную способность почтового ящика, чтобы быть намного выше для этого
PinnedDispatcher
чем нормальные настройки. т.е. установите значение пропускной способности 10000 (или что-то) вместо обычного 25. Предполагая, что содержимое вашего почтового ящика растет большими пакетами, это должно сэкономить вам на накладных расходах синхронизации почтового ящика.
Вот как может выглядеть ваша конфигурация диспетчера:
my-pinned-dispatcher {
type = PinnedDispatcher
throughput = 1000 #your mileage may vary
}
А затем настроить актера, чтобы использовать его
Свободный интерфейс C#
var myActor = myActorSystem.ActorOf(Props.Create<FooActor>()
.WithDispatcher("my-pinned-dispatcher");
конфиг
akka.actor.deployment{
/greeter{
dispatcher = my-pinned-dispatcher
}
}
Обе эти опции вы можете настроить через HOCON в App.config или Web.config, или вы можете использовать свободный интерфейс на Props
класс, чтобы сделать это. Также стоит отметить: в данный момент есть ошибка с закрепленными диспетчерами, но она должна быть исправлена в нашем следующем выпуске (v1.0.1,), который должен выйти на следующей неделе.
Ваш пробег может варьироваться, но это то, что я бы попробовал - в основном он просто разработан, чтобы помочь уменьшить раздоры и накладные расходы вокруг одного актера.