Динамически объединяющие последовательности с Reactive Extensions

Я хочу создать последовательность, которая объединяет одну или несколько последовательностей, созданных динамически (во время выполнения).

Я пробовал с mySequence = mySequence.Concat(anotherSequence), но это нарушает текущие подписки на mySequence так как новая последовательность создается каждый раз.

1 ответ

Решение

Когда вы объединяете одну наблюдаемую последовательность в другую, первая последовательность должна заканчиваться, прежде чем вы получите какие-либо значения из второй последовательности. Это больше похоже на то, что вы хотите объединить две или более последовательности - другими словами, получить значения из любой последовательности, как только эта последовательность произведет значения.

Итак, если вы позволите мне изменить .Concat в .Merge Похоже, у вас есть такой код на данный момент:

IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
mySequence = mySequence.Merge(anotherSequence);

Если я запускаю это, я получаю эти значения:

0
1
2
3
4

Вторая последовательность не объединена.

Теперь, если вы не знаете, когда вы создаете подписку, каковы будущие наблюдаемые, которые вы хотите объединить, то вы можете сделать это:

Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));

sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));

Теперь результаты выглядят так:

0
1
0
2
3
4
1
2
3
4

Это правильно объединило две наблюдаемые, которые были добавлены после подписки. Просто.

Другие вопросы по тегам