Как интегрировать потоки Akka.Net с ядром AspNet или жирафом
Когда я использую Giraffe или ASP.Net Core в целом, я могу создать систему акторов, добавить ее как службу, а затем заставить ее думать, что обработчик запросов выберет любого актера и спросит / передаст сообщение.
Либо с помощью Cluster.Sharding, либо с помощью обычного
user/actor
Я знаю, что это будет один экземпляр актора во всей системе, обрабатывающий несколько сообщений.
Как я могу так же общаться с Streams? Они не кажутся ссылками в маршрутизаторе или системой акторов в качестве путей акторов: ссылки на акторы, путь и адреса.
Следует ли это сделать по-другому?
Копируя из раздела ввода-вывода, я мог бы материализовать один граф для обработки каждого запроса, но в целом я общаюсь с "синглетонами", такими как Domain Driven Design Aggregate Roots, для обработки логики домена (вот почему модуль сегментирования), я не уверен, как для выполнения одиночных приемников, которые могут использоваться во вновь материализованном графе в обработчике запросов, поскольку для всех запросов должен быть только один приемник.
1 ответ
Есть много способов интегрировать потоки akka с внешними системами. Тот, который упрощает получение получателя, будет
Source.queue
(несколько похоже на System.Threading.Channels и предшествующий им). Вы можете материализовать свой поток в точке инициализации, а затем зарегистрировать конечные точки очереди в Giraffe DI - таким образом вы не платите стоимость инициализации одного и того же потока при каждом запросе:
open Akka.Streams
open Akkling
open Akkling.Streams
open FSharp.Control.Tasks.Builders
let run () = task {
use sys = System.create "sys" <| Configuration.defaultConfig()
use mat = sys.Materializer()
// construct a stream with async queue on both ends with buffer for 10 elements
let sender, receiver =
Source.queue OverflowStrategy.Backpressure 10
|> Source.map (fun x -> x * x)
|> Source.toMat (Sink.queue) Keep.both
|> Graph.run mat
// send data to a queue - quite often result could be just ignored
match! sender.OfferAsync 2 with
| :? QueueOfferResult.Enqueued -> () // successfull
| :? QueueOfferResult.Dropped -> () // doesn't happen in OverflowStrategy.Backpressure
| :? QueueOfferResult.QueueClosed -> () // queue has been already closed
| :? QueueOfferResult.Failure as f -> eprintfn "Unexpected failure: %O" f.Cause
// try to receive data from the queue
match! receiver.AsyncPull() with
| Some data -> printfn "Received: %i" data
| None -> printfn "Stream has been prematurelly closed"
// asynchronously close the queue
sender.Complete()
do! sender.WatchCompletionAsync()
}