Вернуть результаты вызывающей стороне с дросселирующей очередью

Опираясь на фрагмент и ответ, можно ли будет возвращать результаты вызывающей стороне из очереди регулирования? Я пытался PostAndAsyncReply, чтобы получить ответ на канале, но он выдает ошибку, если я передаю его с помощью Enqueue. Вот код

Цените основанное на ванили F# решение на основе шаблонов проектирования очереди или почтового ящика.

Вопрос

Вопрос заключается в том, чтобы иметь возможность вызывать функции асинхронно, основываясь на газе (максимум 3 за раз), передавая каждый элемент из массива, ожидая завершения всей очереди / массива, собирая все результаты, и затем возвращая результаты в звонящий. (Вернуть результаты вызывающей стороне - вот что ожидает здесь)

Код Калли

// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
    MailboxProcessor.Start(fun inbox ->
    async {
      // The agent body is not executing in parallel,
      // so we can safely use mutable queue & counter
      let queue = System.Collections.Generic.Queue<Async<unit>>()
      let running = ref 0

      while true do

        // Enqueue new work items or decrement the counter
        // of how many tasks are running in the background
        let! msg = inbox.Receive()
        match msg with
        | Completed -> decr running
        | Enqueue w -> queue.Enqueue(w)

        // If we have less than limit & there is some work to
        // do, then start the work in the background!
        while running.Value < limit && queue.Count > 0 do
          let work = queue.Dequeue()
          incr running
          do! // When the work completes, send 'Completed'
              // back to the agent to free a slot
              async {
                do! work
                inbox.Post(Completed)
              }
              |> Async.StartChild
              |> Async.Ignore
    })


let requestDetailAsync (url: string) : Async<Result<string, Error>> =
     async {
       Console.WriteLine ("Simulating request " + url)
       try
           do! Async.Sleep(1000) // let's say each request takes about a second
           return Ok (url + ":body...")
       with :? WebException as e ->
           return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
     }

let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
    async {
        let urls = [|
                    "http://www.example.com/1";
                    "http://www.example.com/2";
                    "http://www.example.com/3";
                    "http://www.example.com/4";
                    "http://www.example.com/5";
                    "http://www.example.com/6";
                    "http://www.example.com/7";
                    "http://www.example.com/8";
                    "http://www.example.com/9";
                    "http://www.example.com/10";
                |]

        let results = System.Collections.Concurrent.ConcurrentBag<_>()
        let agent = throttlingAgent 3

        for url in urls do
            async {
                let! res = requestDetailAsync url
                results.Add res
            }
            |> Enqueue
            |> agent.Post

        return Ok results
    }

Код звонящего

[<TestMethod>]
member this.TestRequestMasterAsync() =
    match Entity.requestMasterAsync() |> Async.RunSynchronously with
    | Ok result -> Console.WriteLine result
    | Error error -> Console.WriteLine error

1 ответ

Решение

Вы могли бы использовать Hopac.Streams для этого. С таким инструментом это довольно тривиально:

open Hopac
open Hopac.Stream
open System

let requestDetailAsync url = async {
   Console.WriteLine ("Simulating request " + url)
   try
       do! Async.Sleep(1000) // let's say each request takes about a second
       return Ok (url + ":body...")
   with :? Exception as e ->
       return Error e
 }

let requestMasterAsync() : Stream<Result<string,exn>> =
    [| "http://www.example.com/1"
       "http://www.example.com/2"
       "http://www.example.com/3"
       "http://www.example.com/4"
       "http://www.example.com/5"
       "http://www.example.com/6"
       "http://www.example.com/7"
       "http://www.example.com/8"
       "http://www.example.com/9"
       "http://www.example.com/10" |]
    |> Stream.ofSeq
    |> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)

requestMasterAsync()
|> Stream.iterFun (printfn "%A")
|> queue //prints all results asynchronously

let allResults : Result<string,exn> list = 
    requestMasterAsync()
    |> Stream.foldFun (fun results cur -> cur::results ) []
    |> run //fold stream into list synchronously

ДОБАВЛЕНО В случае, если вы хотите использовать только ваниль FSharp.Core только с почтовыми ящиками попробуйте это:

type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>

let inline (>>=) x f = async.Bind(x, f)
let inline (>>-) x f = async.Bind(x, f >> async.Return)

let throttlingAgent limit =
    let agent = MailboxProcessor.Start(fun inbox ->
        let queue = System.Collections.Generic.Queue<Async<unit>>()

        let startWork work = 
            work
            >>- fun _ -> inbox.Post Completed
            |> Async.StartChild |> Async.Ignore

        let rec loop curWorkers =
            inbox.Receive()
            >>= function
            | Completed when queue.Count > 0 -> 
                queue.Dequeue() |> startWork
                >>= fun _ -> loop curWorkers
            | Completed -> 
                loop (curWorkers - 1)
            | Enqueue w when curWorkers < limit ->
                w |> startWork
                >>= fun _ -> loop (curWorkers + 1)
            | Enqueue w ->
                queue.Enqueue w
                loop curWorkers

        loop 0)
    Enqueue >> agent.Post

Это почти та же логика, но немного оптимизированная, чтобы не использовать очередь, если есть свободная рабочая емкость (просто начните работу и не беспокойтесь с очередью / очередью).

throttlingAgent это функция int -> Async<unit> -> unitПотому что мы не хотим, чтобы клиент беспокоился о наших внутренних ThrottlingAgentMessage тип.

используйте как это:

let throttler = throttlingAgent 3

for url in urls do
    async {
        let! res = requestDetailAsync url
        results.Add res
    }
    |> throttler
Другие вопросы по тегам