Структура данных (последовательность, список, массив) асинхронных операций
У меня есть вопрос, касающийся структур данных, которые содержат async
операции. Это может звучать странно.
TestActor
содержит MailBoxProcessor и имеет три функции: Receive
подготавливает процессор почтовых ящиков к приему сообщенийPost
а также PostAndAsyncReply
используются для отправки сообщений актеру.
type TestActor (init, timeout) =
let mutable counter = init
let rcvFun = fun (msg) -> async {
match msg with
| Add i ->
counter <- counter + i
| GetCounter reply ->
reply.Reply counter}
do printfn "Initializing actors: "
do mailbox.Receive (rcvFun, timeout) ////// RECEIVE IS CALLED AT CONSTRUCTION
let mailbox = OnlyLatestMBP<TestMessage> ()
member x.Receive (timeout) =
mailbox.Receive (rcvFun, timeout)
member x.Post (msg: TestMessage, timeout) =
mailbox.Post(msg, timeout)
member x.PostAndAsyncReply (replyChannel, timeout) =
mailbox.PostAndAsyncReply(replyChannel, timeout)
Я хотел бы использовать этот пример, чтобы понять проблему, которая повлияла на мой код. В обычном примере для размещения агентов в структуре данных, Receive
выполнен при строительстве. В моем примере агент может быть протестирован с кодом ниже:
let actorsWorkforce =
seq { 1 .. 5}
|> Seq.map (fun idx -> TestActor(idx, 60000))
let test =
actorsWorkforce
|> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
|> Async.Parallel
|> Async.RunSynchronously
let result =
test
|> Array.iteri (fun idx element ->
match element with
| Some x -> printfn "Actor %i: OK with result %A" idx x
| None -> printfn "Actor %i: Failed" idx )
И это работает как запланировано.
Тем не менее, скажем, я хотел бы отложить вызов Receive
на более поздней стадии.
type TestActor (init) =
let mutable counter = init
let rcvFun = fun (msg) -> async {
match msg with
| Add i ->
counter <- counter + i
| GetCounter reply ->
reply.Reply counter}
let mailbox = OnlyLatestMBP<TestMessage> ()
member x.Receive (timeout) =
mailbox.Receive (rcvFun, timeout)
member x.Post (msg: TestMessage, timeout) =
mailbox.Post(msg, timeout)
member x.PostAndAsyncReply (replyChannel, timeout) =
mailbox.PostAndAsyncReply(replyChannel, timeout)
let actorsWorkforce =
seq { 1 .. 5}
|> Seq.map (fun idx -> TestActor(idx))
actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))
let test =
actorsWorkforce
|> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
|> Async.Parallel
|> Async.RunSynchronously
let result =
test
|> Array.iteri (fun idx element ->
match element with
| Some x -> printfn "Actor %i: OK with result %A" idx x
| None -> printfn "Actor %i: Failed" idx )
Этот кусок кода компилируется, но не работает.mailbox.Receive
имеет тип подписи члена Receive: callback:('a -> Async<unit>) * ?timeout:int -> unit
так что имеет смысл выполнить Receive
с Seq.iter
, Я подозреваю, что код не работает, потому что actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))
дубликаты actorsWorkforce
когда выполнено.
Это правильно? Как я могу это исправить? Спасибо!
РЕДАКТИРОВАТЬ
Весь код:
open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// OnlyLatest
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type Envelope<'a> = Option<DateTime * 'a>
[<Sealed>]
type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
member x.Reply(reply) = replyf(reply)
[<Sealed>]
type OnlyLatestMBP<'a> () =
let mutable currentEnvelope: Envelope<'a> = Envelope<'a>.None
let mutable timestampLastPrcsd: DateTime = DateTime.Now
let mutable react = Unchecked.defaultof<_>
// Msg Box status
let mutable isActive = false
let mutable defaultTimeout = Timeout.Infinite
// Event Messages
let awaitMsg = new AutoResetEvent(false)
let isActiveMsg = new AutoResetEvent(false)
let rec await timeout = async {
let thr = Thread.CurrentThread.ManagedThreadId
printfn "await on thread %i" thr
match currentEnvelope with
| Some (timestamp, x) ->
if timestamp > timestampLastPrcsd then
do! react x
timestampLastPrcsd <- timestamp
printfn "processed message"
currentEnvelope <- Envelope.None
awaitMsg.Reset() |> ignore
return! await timeout
| None ->
let! recd = Async.AwaitWaitHandle(awaitMsg, timeout)
if recd
then return! await timeout
else
isActive <- false
isActiveMsg.Reset() |> ignore
printfn ".. no message within timeout, shutting down" }
member x.DefaultTimeout
with get() = defaultTimeout
and set(value) = defaultTimeout <- value
member x.Receive (callback, ?timeout) =
if not isActive then
isActive <- true
isActiveMsg.Set() |> ignore
let timeout = defaultArg timeout defaultTimeout
react <- callback
let todo = await timeout
Async.Start todo
member x.Post (msg, ?timeout) = async {
let thr = Thread.CurrentThread.ManagedThreadId
printfn "posting on thread %i" thr
let timeout = defaultArg timeout defaultTimeout
if not isActive then
let! recd = Async.AwaitWaitHandle(isActiveMsg, timeout)
if recd then
currentEnvelope <- Envelope.Some(DateTime.Now, msg)
awaitMsg.Set() |> ignore
return true
else return false
else
currentEnvelope <- Envelope.Some(DateTime.Now, msg)
awaitMsg.Set() |> ignore
return true }
member x.PostAndAsyncReply (replyChannelMsg, ?timeout) = async {
let timeout = defaultArg timeout defaultTimeout
let tcs = new TaskCompletionSource<_>()
let msg = replyChannelMsg ( new AsyncReplyChannel<_> (fun reply -> tcs.SetResult(reply)) )
let! posted = x.Post (msg,timeout)
if posted then
match timeout with
| Timeout.Infinite ->
let! result = Async.FromContinuations ( fun (cont, _, _) ->
let apply = fun (task: Task<_>) -> cont (task.Result)
tcs.Task.ContinueWith(apply) |> ignore )
return Some result
| _ ->
let waithandle = tcs.Task.Wait(timeout)
match waithandle with
| false -> return None
| true -> return Some tcs.Task.Result
else return None }
type TestMessage =
| Add of int
| GetCounter of AsyncReplyChannel<int>
type TestActor (init) =
let mutable counter = init
let rcvFun = fun (msg) -> async {
match msg with
| Add i ->
counter <- counter + i
| GetCounter reply ->
reply.Reply counter}
let mailbox = OnlyLatestMBP<TestMessage> ()
// do printfn "Initializing actors: "
// do mailbox.Receive (rcvFun, timeout)
member x.Receive (timeout) =
mailbox.Receive (rcvFun, timeout)
member x.Post (msg: TestMessage, timeout) =
mailbox.Post(msg, timeout)
member x.PostAndAsyncReply (replyChannel, timeout) =
mailbox.PostAndAsyncReply(replyChannel, timeout)
let actorsWorkforce =
seq { 1 .. 5}
|> Seq.map (fun idx -> TestActor(idx))
actorsWorkforce |> Seq.iter (fun actor -> actor.Receive (60000))
let test =
actorsWorkforce
|> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
|> Async.Parallel
|> Async.RunSynchronously
let result =
test
|> Array.iteri (fun idx element ->
match element with
| Some x -> printfn "Actor %i: OK with result %A" idx x
| None -> printfn "Actor %i: Failed" idx )
1 ответ
Как первоначально предполагалось, проблема действительно была с: actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))
Проблема была из- за ленивого характера seq
Я создал суженный пример минимального кода.
open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent
type TestActress (name, timerlength) =
let mutable isActive = false
let rec myTask () = async {
Thread.Sleep (timerlength * 1000)
printfn "%s waited : %i" name timerlength
return! myTask () }
member this.Start () =
isActive <- true
Async.Start (myTask ())
member this.GetStatus () = async {
Thread.Sleep (2000)
return isActive }
// One single element, this is easy
let cameronDiaz = TestActress ("Cameron", 10)
cameronDiaz.Start ()
let status = cameronDiaz.GetStatus () |> Async.RunSynchronously
// Async.Parallel receives a seq<Async<'T>> as an input
// This is why I started off with a seq
// This piece of code does not work
let hollywood =
[ "Cameron"; "Pamela"; "Natalie"; "Diane" ]
|> List.toSeq
|> Seq.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood |> Seq.iter ( fun el -> el.Start () )
let areTheyWorking =
hollywood
|> Seq.map (fun el -> el.GetStatus ())
|> Async.Parallel
|> Async.RunSynchronously
// Allright, with a list I get the function executed when I expect them to
let hollywood2 =
[ "Cameron"; "Pamela"; "Natalie"; "Diane" ]
|> List.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood2 |> List.iter ( fun el -> el.Start () )
let areTheyWorking2 =
hollywood2
|> List.map (fun el -> el.GetStatus ())
|> Async.Parallel
|> Async.RunSynchronously