Как изменить реализацию Rx Builder, чтобы исправить исключение переполнения стека?

Я пытаюсь придумать Rx Builder для использования Reactive Extension в синтаксисе выражения вычисления F#. Как я могу это исправить, чтобы он не взорвал стек? Как пример Seq ниже. И есть ли планы предоставить реализацию RxBuilder как часть Reactive Extensions или как часть будущих версий.NET Framework?

open System
open System.Linq
open System.Reactive.Linq

type rxBuilder() =    
    member this.Delay f = Observable.Defer f
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
        Observable.merge xs ys      
    member this.Yield x = Observable.Return x
    member this.YieldFrom (xs:IObservable<_>) = xs

let rx = rxBuilder()

let rec f x = seq { yield x 
                    yield! f (x + 1) }

let rec g x = rx { yield x 
                    yield! g (x + 1) }


//do f 5 |> Seq.iter (printfn "%A")

do g 5 |> Observable.subscribe (printfn "%A") |> ignore

do System.Console.ReadLine() |> ignore

5 ответов

Решение

Короткий ответ заключается в том, что Rx Framework не поддерживает создание наблюдаемых объектов с использованием рекурсивного шаблона, подобного этому, поэтому это не может быть легко сделано. Combine Операция, которая используется для последовательностей F#, нуждается в особой обработке, которую не обеспечивают наблюдаемые. Rx Framework, вероятно, ожидает, что вы будете генерировать наблюдаемые, используя Observable.Generate а затем использовать LINQ запросы /F# компоновщик вычислений для их обработки.

Во всяком случае, вот некоторые мысли -

Прежде всего, вам нужно заменить Observable.merge с Observable.Concat, Первый запускает обе наблюдаемые параллельно, а второй сначала выдает все значения из первой наблюдаемой, а затем выдает значения из второй наблюдаемой. После этого изменения фрагмент по крайней мере напечатает ~800 чисел до переполнения стека.

Причиной переполнения стека является то, что Concat создает наблюдаемое, которое вызывает Concat создать другую наблюдаемую, которая вызывает Concat и т.д. Один из способов решить эту проблему - добавить синхронизацию. Если вы используете Windows Forms, вы можете изменить Delay так что он планирует наблюдаемое в потоке GUI (который отбрасывает текущий стек). Вот эскиз:

type RxBuilder() =   
  member this.Delay f = 
      let sync = System.Threading.SynchronizationContext.Current 
      let res = Observable.Defer f
      { new IObservable<_> with
          member x.Subscribe(a) = 
            sync.Post( (fun _ -> res.Subscribe(a) |> ignore), null)
            // Note: This is wrong, but we cannot easily get the IDisposable here
            null }
  member this.Combine (xs, ys) = Observable.Concat(xs, ys)
  member this.Yield x = Observable.Return x
  member this.YieldFrom (xs:IObservable<_>) = xs

Чтобы реализовать это должным образом, вы должны написать свой собственный Concat метод, который довольно сложный. Идея была бы такова:

  • Concat возвращает некоторый специальный тип, например IConcatenatedObservable
  • Когда метод вызывается рекурсивно, вы создаете цепочку IConcatenatedObservable которые ссылаются друг на друга
  • Concat Метод будет искать эту цепочку, и когда будет, например, три объекта, он отбрасывает средний (чтобы всегда иметь цепочку длины не более 2).

Это слишком сложно для ответа Stackru, но это может быть полезным отзывом для команды Rx.

Обратите внимание, что это было исправлено в Rx v2.0 (как уже упоминалось здесь), в более общем плане для всех операторов секвенирования (Concat, Catch, OnErrorResumeNext), а также для императивных операторов (If, While и т. Д.).

По сути, вы можете думать об этом классе операторов как о подписке на другую последовательность в сообщении наблюдателя за терминалом (например, Concat подписывается на следующую последовательность после получения сообщения OnCompleted текущего), где и происходит аналогия с хвостовой рекурсией.

В Rx v2.0 все хвостовые рекурсивные подписки сведены в структуру, похожую на очередь, для обработки по одной за раз, общаясь с нижестоящим наблюдателем. Это позволяет избежать неограниченного роста наблюдателей, разговаривающих друг с другом для последовательных подписок на последовательности.

Это было исправлено в Rx 2.0 Beta. А вот и тест.

Как насчет этого?

type rxBuilder() =    
   member this.Delay (f : unit -> 'a IObservable) = 
               { new IObservable<_> with
                    member this.Subscribe obv = (f()).Subscribe obv }
   member this.Combine (xs:'a IObservable, ys: 'a IObservable) =
               { new IObservable<_> with
                    member this.Subscribe obv = xs.Subscribe obv ; 
                                                ys.Subscribe obv }
   member this.Yield x = Observable.Return x
   member this.YieldFrom xs = xs

let rx = rxBuilder()

let rec f x = rx { yield x 
                   yield! f (x + 1) }

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore

do System.Console.ReadLine() |> ignore

http://rxbuilder.codeplex.com/ (создан для экспериментов с RxBuilder)

Xs одноразовые не подключены. Как только я пытаюсь подключить одноразовое устройство, оно снова начинает взрываться.

Если мы удалим синтаксический сахар из этого вычислительного выражения (он же Monad), мы получим:

let rec g x = Observable.Defer (fun () -> Observable.merge(Observable.Return x, g (x + 1) )

Или в C#:

public static IObservable<int> g(int x)
{
    return Observable.Defer<int>(() =>
    {
      return Observable.Merge(Observable.Return(x), g(x + 1));                    
    });
}

Который определенно не хвост рекурсивный. Я думаю, что если вы можете сделать это хвостом рекурсивным, то это, вероятно, решит вашу проблему

Другие вопросы по тегам