Оптимизация производительности агентов F# Akka.NET при синхронизации

Я пытаюсь решить следующую проблему. У меня есть некоторые агенты, работающие в режиме реального времени, с большим пульсом в пару миллисекунд, и порядок операций, которые они обрабатывают по этой причине, в основном детерминирован (поскольку обработка сообщений не является узким местом).

Сейчас я выполняю большое количество симуляций системы, в которых у меня больше нет сердцебиения (в противном случае это займет пару столетий), но мне нужно убедиться, что порядок операций сохранен. Для этого я принял следующее решение: симулятор проверяет, что каждый агент обработал свою очередь сообщений, отправляя фиктивное сообщение о синхронизации и блокируя его в ожидании ответа. Это работает для моего приложения, но время, которое требуется, не является интуитивно понятным - поскольку однопотоковая реализация будет на порядок быстрее (я думаю - x 100 ish - хотя я не проверял).

Я выделил небольшой тест, который показывает проблему, даже пытаясь использовать другую библиотеку, akka.net

type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2  

[<EntryPoint>]
let main argv =
    let system = System.create "MySystem" <| Configuration.load()    
    let greeter = spawn system "greeter" <| fun mailbox ->
        let rec loop() = actor {
            let! msg = mailbox.Receive()
            let sender = mailbox.Sender()
            match msg with
                | Greet who -> () // printf "Hello, %s!\n" who
                | Hello2 -> sender.Tell(true)
                | _ -> ()
            return! loop()
            }
        loop()

    let greeterF =
        MailboxProcessor.Start
            (fun inbox ->                
                async {
                    while true do
                        let! msg = inbox.Receive()
                        match msg with
                        | Greet who -> () // printf "Hello, %s!\n" who
                        | Hello reply -> reply.Reply true
                        | _ -> ()
                    }
            )

    let n = 1000000

    let t1 = System.Diagnostics.Stopwatch()
    t1.Start()
    for i = 1 to n do
        let rep = greeterF.PostAndReply(fun reply -> (Hello reply)) |> ignore
        ()

    printfn "elapsed Mailbox:%A" t1.ElapsedMilliseconds

    t1.Restart()

    for i = 1 to n do        
        let res = greeter.Ask (Hello2)
        let rr = res.Result
        ()

    printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
    System.Console.ReadLine () |> ignore

    0

По сути, обе из них занимают около 10 секунд всего за 1 миллион синхронизаций, а не вычислений, которые когда-либо были задействованы, и это... неудачно.

Мне интересно, сталкивался ли кто-нибудь с такой же проблемой, и есть ли возможность отключить служебную нагрузку, заставляя все работать в однопоточном режиме... что-то вроде лучше, чем деактивация всех процессоров, кроме 1 в BIOS - или написание клона всей системы без агентов.

Любая помощь приветствуется.

3 ответа

Причина медленной версии Akka.NET заключается в том, как вы общаетесь с актером:

main process    Task     FutureActorRef  !!ThreadPool!!   greeter
    Ask ---------------------->
                              Tell-----------> 
                                             MailboxRun ----->
                                 (greeter mailbox is empty)  |                 
                               <--------------------------Tell 
                  <--Complete task
    <----------.Result
  1. Для каждой итерации будет создана задача TPL

  2. Затем единственное сообщение отправляется встречающему.

  3. Затем основной процесс блокируется в ожидании ответа.

  4. Ответивший отвечает, что, в свою очередь, завершает задачу внутри FutureActorRef

Ополаскивание и повторение. Этот дизайн заставит Akka.NET запускать и останавливать "запуск почтового ящика" для каждого сообщения, поскольку очередь почтовых ящиков становится пустой для каждой итерации. Это приводит к планированию пула потоков для каждого передаваемого сообщения.

Это немного похоже на то, как когда вы садитесь в машину, нажимаете педаль до упора, затем резко останавливаетесь и выходите из машины, а затем снова повторяете процедуру. Это просто не очень эффективный способ быстро путешествовать.

Предложение @Aaronontheweb вступит в силу только в том случае, если вы решите вышеуказанные проблемы в своем коде. Почтовый ящик должен иметь возможность постоянно выбирать сообщения из внутренней очереди для работы с сообщениями в пакетном режиме для достижения полной пропускной способности.

Вместо этого отделите производителя от потребителя. Создайте актера, который слушает ответы вашего приветствующего. И как только этот субъект обработает ваши 1000000 сообщений, пусть этот субъект отправит сообщение WorkCompleted обратно потребителю.

[Править] Я сам попробовал, я не знаю F#, так что это может быть не совсем идиоматично:)

open Akka
open Akka.Actor
open Akka.FSharp

type Greet = 
| Greet of string
| Hello of AsyncReplyChannel<bool>
| Hello2 

type Consume =
| Response
| SetSender

[<EntryPoint>]
let main argv =

    let system = System.create "MySystem" <| Configuration.load()    
    let greeter = spawn system "greeter" <| fun mailbox ->
        let rec loop() = actor {
            let! msg = mailbox.Receive()
            let sender = mailbox.Sender()
            match msg with
                | Greet who -> () // printf "Hello, %s!\n" who
                | Hello2 -> sender.Tell(Response)
                | _ -> ()
            return! loop()
            }
        loop()

    let consumer = spawn system "consumer" <| fun mailbox ->
        let rec loop(count,sender : IActorRef) = actor {
            if count = 1000000 then sender.Tell(true)
            let! msg = mailbox.Receive()
            match msg with
            | Response -> return! loop(count+1,sender)
            | SetSender -> return! loop(count,mailbox.Sender())

        }  
        loop(0,null)      

    let n = 1000000

    let t1 = System.Diagnostics.Stopwatch()
    t1.Start()   
    for i = 1 to n do        
        greeter.Tell(Hello2,consumer)

    let workdone = consumer.Ask SetSender
    workdone.Wait()

    printfn "elapsed Akka:%A" t1.ElapsedMilliseconds
    System.Console.ReadLine () |> ignore

    0

Я обновил ваш код, чтобы использовать отдельного потребителя для ответов актера, а затем отвечать обратно после обработки всех ответов.

Благодаря этому ваше время обработки на моей машине сократилось до 650 мс.

Если вы хотите повысить пропускную способность, вам нужно привлечь больше актеров для параллелизации.

Я не уверен, поможет ли это в вашем конкретном сценарии

Вот слегка модифицированный MailboxProcessor версия:

module MBPAsync =
  type Greet = 
   | Greet of string
   | Hello of AsyncReplyChannel<bool>

  let run n =
    let timer = Stopwatch.StartNew ()

    use greeter =
      MailboxProcessor.Start <| fun inbox -> async {
        while true do
          let! msg = inbox.Receive()
          match msg with
           | Greet who -> () // printf "Hello, %s!\n" who
           | Hello reply -> reply.Reply true
      }

    Async.RunSynchronously <| async {
      for i = 1 to n do
        do! Async.Ignore (greeter.PostAndAsyncReply Hello)
    }

    let elapsed = timer.Elapsed
    printfn "%A" elapsed

Разница здесь в том, что эта версия использует PostAndAsyncReply и сохраняет вычисления в асинхронном рабочем процессе. На моем быстром тесте это было намного быстрее, чем при использовании PostAndReply, но YMMV.

Время, которое я получаю из вышеуказанной версии MBP, выглядит примерно так:

> MBPAsync.run 1000000 ;;
00:00:02.6883486
val it : unit = ()

В комментарии ранее упоминалась моя библиотека Hopac. Вот оптимизированная версия с использованием Hopac:

module Hop =
  type Greet = 
   | Greet of string
   | Hello of IVar<bool>

  let run n =
    let timer = Stopwatch.StartNew ()

    let greeterCh = ch ()
    do greeterCh >>= function
          | Greet who -> Job.unit ()
          | Hello reply -> reply <-= true
       |> Job.forever
       |> server

    Job.forUpToIgnore 1 n <| fun _ ->
        let reply = ivar ()
        greeterCh <-- Hello reply >>.
        reply
    |> run

    let elapsed = timer.Elapsed
    printfn "%A" elapsed

Время, которое я получаю из вышеупомянутой версии Hopac, выглядит примерно так:

> Hop.run 1000000 ;;
00:00:00.1088768
val it : unit = ()

Я не F# разработчик, но я главный разработчик на Akka.NET. Пара идей для вашего сценария:

  1. Если вы используете только одного актера для этой работы, вы можете попробовать использовать PinnedDispatcher - таким образом, актер все время работает в своем собственном потоке. Это избавит вас от ненужных переключений контекста.

  2. Вы также можете установить пропускную способность почтового ящика, чтобы быть намного выше для этого PinnedDispatcher чем нормальные настройки. т.е. установите значение пропускной способности 10000 (или что-то) вместо обычного 25. Предполагая, что содержимое вашего почтового ящика растет большими пакетами, это должно сэкономить вам на накладных расходах синхронизации почтового ящика.

Вот как может выглядеть ваша конфигурация диспетчера:

 my-pinned-dispatcher {
      type = PinnedDispatcher
      throughput = 1000 #your mileage may vary
 }

А затем настроить актера, чтобы использовать его

Свободный интерфейс C#

var myActor = myActorSystem.ActorOf(Props.Create<FooActor>()
.WithDispatcher("my-pinned-dispatcher");

конфиг

akka.actor.deployment{
   /greeter{
     dispatcher = my-pinned-dispatcher
   }
}

Обе эти опции вы можете настроить через HOCON в App.config или Web.config, или вы можете использовать свободный интерфейс на Props класс, чтобы сделать это. Также стоит отметить: в данный момент есть ошибка с закрепленными диспетчерами, но она должна быть исправлена ​​в нашем следующем выпуске (v1.0.1,), который должен выйти на следующей неделе.

Ваш пробег может варьироваться, но это то, что я бы попробовал - в основном он просто разработан, чтобы помочь уменьшить раздоры и накладные расходы вокруг одного актера.

Другие вопросы по тегам