Как использовать Akka.Streams. *. ConcatMany в F#?

Я хочу создать поток, который создает новый источник (это будет запрос на сохранение) из входящих элементов, а затем сглаживает результаты. Что-то вроде этого упрощенного примера:

var z = Source.Single(1).ConcatMany(i => Source.Single(i));

этот код компилируется и работает, как ожидалось. Моя проблема в том, что когда я перевожу его на F#:

let z = Source.Single(1).ConcatMany(fun i -> Source.Single(i))

Я получаю сообщение об ошибке

This expression was expected to have type
    'IGraph<SourceShape<'a>,Akka.NotUsed>'    
but here has type
    'Source<int,Akka.NotUsed>'    

Я думаю, что причина этого в том, что F# обрабатывает ко / контравариантность иначе, чем C#, и не может просто преобразовать эти общие специализации (https://github.com/fsharp/fslang-suggestions/issues/162), но я не могу понять способ сделать преобразование междуint и SourceShape<int>. Можно ли преобразовать этот пример в F#?

2 ответа

Решение

Глядя на код на GitHub, кажется, чтоSource<TOut, TMat> это прямая реализация IGraph, так что вы можете просто использовать его:

public sealed class Source<TOut, TMat> : IFlow<TOut, TMat>, IGraph<SourceShape<TOut>, TMat>

let z = Source.Single(1).ConcatMany(fun i -> Source.Single(i) :> IGraph<SourceShape<int>,Akka.NotUsed>)

Я думаю, что самая большая разница между использованием C# и F# заключается в том, что C# автоматически сделает за вас апкастинг.

Я нашел один обходной путь - использовать библиотеку-оболочку Akkling.Streams:

open Akkling.Streams

let x =
    Source.singleton 1
    |> Source.collectMap(fun x -> Source.singleton x)

вопрос, как это сделать без Акклинга, остается открытым.

Другие вопросы по тегам