Динамически объединяющие последовательности с Reactive Extensions
Я хочу создать последовательность, которая объединяет одну или несколько последовательностей, созданных динамически (во время выполнения).
Я пробовал с mySequence = mySequence.Concat(anotherSequence)
, но это нарушает текущие подписки на mySequence
так как новая последовательность создается каждый раз.
1 ответ
Когда вы объединяете одну наблюдаемую последовательность в другую, первая последовательность должна заканчиваться, прежде чем вы получите какие-либо значения из второй последовательности. Это больше похоже на то, что вы хотите объединить две или более последовательности - другими словами, получить значения из любой последовательности, как только эта последовательность произведет значения.
Итак, если вы позволите мне изменить .Concat
в .Merge
Похоже, у вас есть такой код на данный момент:
IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
mySequence = mySequence.Merge(anotherSequence);
Если я запускаю это, я получаю эти значения:
0 1 2 3 4
Вторая последовательность не объединена.
Теперь, если вы не знаете, когда вы создаете подписку, каковы будущие наблюдаемые, которые вы хотите объединить, то вы можете сделать это:
Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));
Теперь результаты выглядят так:
0 1 0 2 3 4 1 2 3 4
Это правильно объединило две наблюдаемые, которые были добавлены после подписки. Просто.