Как интегрировать потоки Akka.Net с ядром AspNet или жирафом

Когда я использую Giraffe или ASP.Net Core в целом, я могу создать систему акторов, добавить ее как службу, а затем заставить ее думать, что обработчик запросов выберет любого актера и спросит / передаст сообщение.

Либо с помощью Cluster.Sharding, либо с помощью обычного user/actor Я знаю, что это будет один экземпляр актора во всей системе, обрабатывающий несколько сообщений.

Как я могу так же общаться с Streams? Они не кажутся ссылками в маршрутизаторе или системой акторов в качестве путей акторов: ссылки на акторы, путь и адреса.

Следует ли это сделать по-другому?

Копируя из раздела ввода-вывода, я мог бы материализовать один граф для обработки каждого запроса, но в целом я общаюсь с "синглетонами", такими как Domain Driven Design Aggregate Roots, для обработки логики домена (вот почему модуль сегментирования), я не уверен, как для выполнения одиночных приемников, которые могут использоваться во вновь материализованном графе в обработчике запросов, поскольку для всех запросов должен быть только один приемник.

1 ответ

Есть много способов интегрировать потоки akka с внешними системами. Тот, который упрощает получение получателя, будет Source.queue(несколько похоже на System.Threading.Channels и предшествующий им). Вы можете материализовать свой поток в точке инициализации, а затем зарегистрировать конечные точки очереди в Giraffe DI - таким образом вы не платите стоимость инициализации одного и того же потока при каждом запросе:

open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders

let run () = task {
    use sys = System.create "sys" <| Configuration.defaultConfig()
    use mat = sys.Materializer()
    
    // construct a stream with async queue on both ends with buffer for 10 elements
    let sender, receiver =
        Source.queue OverflowStrategy.Backpressure 10
        |> Source.map (fun x -> x * x)
        |> Source.toMat (Sink.queue) Keep.both
        |> Graph.run mat
        
    // send data to a queue - quite often result could be just ignored
    match! sender.OfferAsync 2 with
    | :? QueueOfferResult.Enqueued -> () // successfull
    | :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure 
    | :? QueueOfferResult.QueueClosed -> () // queue has been already closed
    | :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
    
    // try to receive data from the queue
    match! receiver.AsyncPull() with
    | Some data -> printfn "Received: %i" data
    | None -> printfn "Stream has been prematurelly closed"
        
    // asynchronously close the queue
    sender.Complete()
    do! sender.WatchCompletionAsync()
}
Другие вопросы по тегам