Переключение потоков в RX: Sodium эквивалент слияния и переключения в RX

Как RX может решить проблему телевизионного канала, как объяснено в этом выступлении на 31-й минуте?

Проблема, выраженная в Rx, следующая:

Два телевизионных канала (channel1 а также channel2), которые передают поток изображений, а также поток fuzz который не представляет канал или белый шум.

Есть две кнопки, которые отправляют события eButton1 а также eButton2 когда они нажаты.

Эти нажатия кнопок должны приводить к отправке соответствующих каналов на экран.

Каждое нажатие кнопки должно проецироваться (отображаться) на соответствующий канал, а затем все каналы объединяются в поток выбора в виде потока потоков, который начинается с fuzz поток. Наконец, оператор коммутатора отправляет выбранный поток на screen,

Вопрос

Что эквивалентно переключателю Sodium и слиянию в RX?

Можно ли решить это с помощью чисто функций высшего порядка? Т.е. без использования замыканий? Я не понимаю, как это было бы возможно.

2 ответа

Решение

Switch и Merge существуют в основной библиотеке Rx, поэтому, к счастью, код на слайдах фактически переводит буквально строку за строкой в ​​Rx.

Switch оператор работает на потоке потоков - в Rx это тип IObservable<IObservable<T>>,

Коммутатор сглаживает этот поток потоков, отправляя на него только самый последний поток, поэтому вы получите IObservable<T>,

Смотрите пример C# ниже. Я повторно использовал имена переменных в разговоре, насколько это возможно, поэтому за ним должно быть легко следить.

Единственное, что (очень немного) отличается, это hold функция заменяется на эквивалент Rx StartWith,

Включить пакет Nuget Rx-Main и запустите это как консольное приложение. Код подписывается на screen поток и начинает рендеринг кадров из канала "Fuzz" на консоль. Он предложит вам номер канала. Введите 1 или 2, и вы увидите переключение выхода на кадры из соответствующего канала.

// helper method to create channels
private static IObservable<string> CreateChannelStream(
    string name, CompositeDisposable disposables)
{
    // this hacks together a demo channel stream -
    // a stream of "frames" for the channel
    // for simplicity rather than using images, I use a string
    // message for each frame
    // how it works isn't important, just know you'll get a
    // message event every second
    var channel = Observable.Interval(TimeSpan.FromSeconds(1))
                            .Select(x => name + " Frame: " + x)
                            .Publish();
    disposables.Add(channel.Connect());
    return channel;
}

public static void Main()
{       
    // for cleaning up the hot channel streams
    var disposable = new CompositeDisposable();

    // some channels
    var fuzz = CreateChannelStream("Fuzz", disposable);
    var channel1 = CreateChannelStream("Channel1", disposable);
    var channel2 = CreateChannelStream("Channel2", disposable);

    // the button press event streams
    var eButton1 = new Subject<Unit>();
    var eButton2 = new Subject<Unit>();

    // the button presses are projected to
    // the respective channel streams
    // note, you could obtain the channel via a function call here
    // if you wanted to - to keep it close to the slides I'm not.
    var eChan1 = eButton1.Select(_ => channel1);
    var eChan2 = eButton2.Select(_ => channel2);

    // create the selection "stream of streams"
    // an IObservable<IObservable<string>> here
    // that starts with "fuzz"
    var sel = Observable.Merge(eChan1, eChan2).StartWith(fuzz);

    // flatten and select the most recent stream with Switch
    var screen = sel.Switch();

    // subscribe to the screen and print the frames
    // it will start with "fuzz"
    disposable.Add(screen.Subscribe(Console.WriteLine));

    bool quit = false;

    // a little test loop
    // entering 1 or 2 will switch
    // to that channel
    while(!quit)
    {
        var chan = Console.ReadLine();
        switch (chan.ToUpper())
        {
            case "1":
                // raise a button 1 event
                eButton1.OnNext(Unit.Default);
                break;
            case "2":
                // raise a button 2 event
                eButton2.OnNext(Unit.Default);
                break;  
            case "Q":
                quit = true;
                break;                
        }
    }         

    disposable.Dispose();
}

Это правильная вещь:

IObservable<System.Drawing.Image> fuzz = ...
IObservable<System.Drawing.Image> channel1 = ...
IObservable<System.Drawing.Image> channel2 = ...

IObservable<string> eButton1 = ... // produces string "eButton1" when clicked
IObservable<string> eButton2 = ... // produces string "eButton2" when clicked

var output =
(
    from button in eButton1.Merge(eButton2).StartWith("")
    select
        button == "eButton1"
            ? channel1
            : (button == "eButton2"
                ? channel2 
                : fuzz)
).Switch();
Другие вопросы по тегам