Как изменить реализацию Rx Builder, чтобы исправить исключение переполнения стека?
Я пытаюсь придумать Rx Builder для использования Reactive Extension в синтаксисе выражения вычисления F#. Как я могу это исправить, чтобы он не взорвал стек? Как пример Seq ниже. И есть ли планы предоставить реализацию RxBuilder как часть Reactive Extensions или как часть будущих версий.NET Framework?
open System
open System.Linq
open System.Reactive.Linq
type rxBuilder() =
member this.Delay f = Observable.Defer f
member this.Combine (xs: IObservable<_>, ys : IObservable<_>) =
Observable.merge xs ys
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
let rx = rxBuilder()
let rec f x = seq { yield x
yield! f (x + 1) }
let rec g x = rx { yield x
yield! g (x + 1) }
//do f 5 |> Seq.iter (printfn "%A")
do g 5 |> Observable.subscribe (printfn "%A") |> ignore
do System.Console.ReadLine() |> ignore
5 ответов
Короткий ответ заключается в том, что Rx Framework не поддерживает создание наблюдаемых объектов с использованием рекурсивного шаблона, подобного этому, поэтому это не может быть легко сделано. Combine
Операция, которая используется для последовательностей F#, нуждается в особой обработке, которую не обеспечивают наблюдаемые. Rx Framework, вероятно, ожидает, что вы будете генерировать наблюдаемые, используя Observable.Generate
а затем использовать LINQ запросы /F# компоновщик вычислений для их обработки.
Во всяком случае, вот некоторые мысли -
Прежде всего, вам нужно заменить Observable.merge
с Observable.Concat
, Первый запускает обе наблюдаемые параллельно, а второй сначала выдает все значения из первой наблюдаемой, а затем выдает значения из второй наблюдаемой. После этого изменения фрагмент по крайней мере напечатает ~800 чисел до переполнения стека.
Причиной переполнения стека является то, что Concat
создает наблюдаемое, которое вызывает Concat
создать другую наблюдаемую, которая вызывает Concat
и т.д. Один из способов решить эту проблему - добавить синхронизацию. Если вы используете Windows Forms, вы можете изменить Delay
так что он планирует наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:
type RxBuilder() =
member this.Delay f =
let sync = System.Threading.SynchronizationContext.Current
let res = Observable.Defer f
{ new IObservable<_> with
member x.Subscribe(a) =
sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
// Note: This is wrong, but we cannot easily get the IDisposable here
null }
member this.Combine (xs, ys) = Observable.Concat(xs, ys)
member this.Yield x = Observable.Return x
member this.YieldFrom (xs:IObservable<_>) = xs
Чтобы реализовать это должным образом, вы должны написать свой собственный Concat
метод, который довольно сложный. Идея была бы такова:
- Concat возвращает некоторый специальный тип, например
IConcatenatedObservable
- Когда метод вызывается рекурсивно, вы создаете цепочку
IConcatenatedObservable
которые ссылаются друг на друга Concat
Метод будет искать эту цепочку, и когда будет, например, три объекта, он отбрасывает средний (чтобы всегда иметь цепочку длины не более 2).
Это слишком сложно для ответа Stackru, но это может быть полезным отзывом для команды Rx.
Обратите внимание, что это было исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов секвенирования (Concat, Catch, OnErrorResumeNext), а также для императивных операторов (If, While и т. Д.).
По сути, вы можете думать об этом классе операторов как о подписке на другую последовательность в сообщении наблюдателя за терминалом (например, Concat подписывается на следующую последовательность после получения сообщения OnCompleted текущего), где и происходит аналогия с хвостовой рекурсией.
В Rx v2.0 все хвостовые рекурсивные подписки сведены в структуру, похожую на очередь, для обработки по одной за раз, общаясь с нижестоящим наблюдателем. Это позволяет избежать неограниченного роста наблюдателей, разговаривающих друг с другом для последовательных подписок на последовательности.
Как насчет этого?
type rxBuilder() =
member this.Delay (f : unit -> 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = (f()).Subscribe obv }
member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
{ new IObservable<_> with
member this.Subscribe obv = xs.Subscribe obv ;
ys.Subscribe obv }
member this.Yield x = Observable.Return x
member this.YieldFrom xs = xs
let rx = rxBuilder()
let rec f x = rx { yield x
yield! f (x + 1) }
do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore
do System.Console.ReadLine() |> ignore
http://rxbuilder.codeplex.com/ (создан для экспериментов с RxBuilder)
Xs одноразовые не подключены. Как только я пытаюсь подключить одноразовое устройство, оно снова начинает взрываться.
Если мы удалим синтаксический сахар из этого вычислительного выражения (он же Monad), мы получим:
let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )
Или в C#:
public static IObservable<int> g(int x)
{
return Observable.Defer<int>(() =>
{
return Observable.Merge(Observable.Return(x), g(x + 1));
});
}
Который определенно не хвост рекурсивный. Я думаю, что если вы можете сделать это хвостом рекурсивным, то это, вероятно, решит вашу проблему