Реактивные расширения: параллелизм внутри абонента

Я пытаюсь сосредоточиться на поддержке параллелизма в Reactive Extensions, и мне трудно получить результаты, к которым я стремлюсь. Так что я пока не могу этого понять.

У меня есть источник, который передает данные в поток быстрее, чем их может использовать подписчик. Я бы предпочел настроить поток так, чтобы другой поток использовался для вызова подписчика для каждого нового элемента из потока, чтобы у подписчика было несколько потоков, проходящих через него одновременно. Я в состоянии обеспечить потокобезопасность подписчика.

Следующий пример демонстрирует проблему:

Observable.Interval( TimeSpan.FromSeconds(1))
    .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now, 
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(x =>
               {
                   Console.WriteLine("{0} Thread: {1} Observed value: {2}",
                                     DateTime.Now,
                                     Thread.CurrentThread.ManagedThreadId, x);
                   Thread.Sleep(5000); // Simulate long work time
               });

Вывод консоли выглядит следующим образом (даты удалены):

4:25:20 PM Thread: 6 Source value: 0
4:25:20 PM Thread: 11 Observed value: 0
4:25:21 PM Thread: 12 Source value: 1
4:25:22 PM Thread: 12 Source value: 2
4:25:23 PM Thread: 6 Source value: 3
4:25:24 PM Thread: 6 Source value: 4
4:25:25 PM Thread: 11 Observed value: 1
4:25:25 PM Thread: 12 Source value: 5
4:25:26 PM Thread: 6 Source value: 6

Обратите внимание на дельты времени "Наблюдаемое значение". Подписчик не вызывается параллельно, даже если источник продолжает излучать данные быстрее, чем подписчик может их обработать. Хотя я могу представить несколько сценариев, в которых было бы полезно текущее поведение, мне нужно иметь возможность обрабатывать сообщения, как только они становятся доступными.

Я пробовал несколько вариантов планировщиков с помощью метода ObserveOn, но ни один из них, кажется, не делает то, что я хочу.

Помимо выделения потока в действии "Подписаться" для выполнения длительной работы, есть ли что-то, чего мне не хватает, что позволит одновременно доставлять данные подписчику?

Заранее спасибо за все ответы и предложения!

1 ответ

Решение

Фундаментальная проблема здесь заключается в том, что вы хотите, чтобы наблюдаемый Rx отправлял события таким образом, который действительно нарушает правила работы наблюдаемых. Я думаю, что было бы поучительно взглянуть на рекомендации по проектированию Rx здесь: http://go.microsoft.com/fwlink/?LinkID=205219 - в частности, "4.2 Предположим, что экземпляры наблюдателей вызываются сериализованным образом". т.е. вы не должны иметь возможность выполнять вызовы OnNext параллельно. На самом деле, порядок упорядочения Rx довольно важен для философии дизайна.

Если вы посмотрите на источник, то увидите, что Rx сообщает об этом в ScheduledObserver<T> класс из которого ObserveOnObserver<T> является производным... OnNexts отправляются из внутренней очереди, и каждая должна завершиться до отправки следующей - в заданном контексте выполнения. Rx не позволяет одновременно выполнять вызовы OnNext отдельного абонента.

Это не значит, что у вас не может быть нескольких подписчиков, работающих с разной скоростью. На самом деле это легко увидеть, если вы измените свой код следующим образом:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}",
                                DateTime.Now,
                                Thread.CurrentThread.ManagedThreadId, x))
    .ObserveOn(NewThreadScheduler.Default);

var subscription1 = source.Subscribe(x =>
    {
        Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}",
                            DateTime.Now,
                            Thread.CurrentThread.ManagedThreadId, x);
        Thread.Sleep(1000); // Simulate long work time
    });

var subscription2 = source.Subscribe(x =>
{
    Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}",
                        DateTime.Now,
                        Thread.CurrentThread.ManagedThreadId, x);
    Thread.Sleep(5000); // Simulate long work time
});

Теперь вы увидите, как подписчик 1 опережает подписчика 2.

То, что вы не можете легко сделать, это попросить наблюдаемое сделать что-то вроде отправки вызова OnNext "готовому" абоненту - что-то вроде того, что вы запрашиваете окольным путем. Я также предполагаю, что вы действительно не захотите создавать новую тему для каждого OnNext в ситуации медленного потребления!

В этом сценарии кажется, что вам может быть лучше с одним подписчиком, который ничего не делает, кроме как максимально быстро переносит работу в очередь, которая, в свою очередь, обслуживается несколькими потребляющими рабочими потоками, которые затем можно контролировать по мере необходимости, чтобы сохранить темп.

Другие вопросы по тегам