Observable.ObserveOn(), кажется, не имеет никакого эффекта
Я пытаюсь использовать Rx для параллельной обработки элементов. Кажется, я не могу сказать Rx запустить OnNext() моего наблюдателя параллельно. Вот тестовый код для демонстрации
[Test]
public void ObservableObserveOnNewThreadRunsInParallel()
{
Console.WriteLine("Starting thread: {0}", Thread.CurrentThread.ManagedThreadId);
// store items as they are output
var list = new List<Tuple<string, int, int, int, TimeSpan>>();
// used to wait until a sequences are complete
var ev = new AutoResetEvent(false);
// try these schedulers
var schedulers = new[] {
Tuple.Create("ThreadPoolScheduler.Instance", (IScheduler)ThreadPoolScheduler.Instance),
Tuple.Create("NewThreadScheduler.Default", (IScheduler)NewThreadScheduler.Default),
Tuple.Create("TaskPoolScheduler.Default", (IScheduler)TaskPoolScheduler.Default),
Tuple.Create("Scheduler.Default", (IScheduler)Scheduler.Default),
Tuple.Create("Scheduler.Immediate", (IScheduler)Scheduler.Immediate),
};
// try each scheduler
foreach (var schedulerTuple in
schedulers) {
// emit tuples <i, delay> where delay decreases with each new tuple
// such that output timing is expected to be reversed
var observable =
Observable.Range(0, 5)
.Select(i => Tuple.Create((int)i, (int)(500 - i * 100)))
.Take(5);
var dt = DateTime.Now;
Tuple<string, IScheduler> scheduler = schedulerTuple;
observable
// specify the scheduler to use
.ObserveOn(schedulerTuple.Item2)
.Subscribe(
v => {
// emulate some work (first items take longer than last items)
Thread.Sleep(v.Item2);
// record when the item is done recording
lock (list)
list.Add(
Tuple.Create(
scheduler.Item1,
v.Item1,
v.Item2,
Thread.CurrentThread.ManagedThreadId,
dt - DateTime.Now));
},
// let the test go on
() => ev.Set());
// wait until the end of the sequence
ev.WaitOne();
}
// print observed order
foreach (var i in list) {
Console.WriteLine(i);
}
}
И вывод:
Starting thread: 5
(ThreadPoolScheduler.Instance, 0, 500, 9, -00:00:04.2514251)
(ThreadPoolScheduler.Instance, 1, 400, 9, -00:00:04.6524652)
(ThreadPoolScheduler.Instance, 2, 300, 9, -00:00:04.9524952)
(ThreadPoolScheduler.Instance, 3, 200, 9, -00:00:05.1525152)
(ThreadPoolScheduler.Instance, 4, 100, 9, -00:00:05.2525252)
(NewThreadScheduler.Default, 0, 500, 11, -00:00:06.5066506)
(NewThreadScheduler.Default, 1, 400, 11, -00:00:06.9066906)
(NewThreadScheduler.Default, 2, 300, 11, -00:00:07.2067206)
(NewThreadScheduler.Default, 3, 200, 11, -00:00:07.4067406)
(NewThreadScheduler.Default, 4, 100, 11, -00:00:07.5067506)
(TaskPoolScheduler.Default, 0, 500, 12, -00:00:00.5020502)
(TaskPoolScheduler.Default, 1, 400, 12, -00:00:00.9020902)
(TaskPoolScheduler.Default, 2, 300, 12, -00:00:01.2021202)
(TaskPoolScheduler.Default, 3, 200, 12, -00:00:01.4021402)
(TaskPoolScheduler.Default, 4, 100, 12, -00:00:01.5021502)
(Scheduler.Default, 0, 500, 13, -00:00:00.5020502)
(Scheduler.Default, 1, 400, 13, -00:00:00.9020902)
(Scheduler.Default, 2, 300, 13, -00:00:01.2021202)
(Scheduler.Default, 3, 200, 13, -00:00:01.4021402)
(Scheduler.Default, 4, 100, 13, -00:00:01.5021502)
(Scheduler.Immediate, 0, 500, 5, -00:00:00.5020502)
(Scheduler.Immediate, 1, 400, 5, -00:00:00.9040904)
(Scheduler.Immediate, 2, 300, 5, -00:00:01.2041204)
(Scheduler.Immediate, 3, 200, 5, -00:00:01.4041404)
(Scheduler.Immediate, 4, 100, 5, -00:00:01.5041504)
Обратите внимание, что каждый вызов OnNext ожидал предыдущего вызова, хотя я явно использую ObserveOn()
указать планировщик, который будет использоваться для уведомлений.
Я ожидал, что все, кроме планировщика. Немедленно запустить уведомления параллельно.
Кто-нибудь знает, что я делаю не так?
1 ответ
Это по замыслу. Один из основных контрактов Rx заключается в том, что все уведомления должны быть сериализованы.
См. §§4.2, 6.7 в Руководстве по проектированию Rx.
Наблюдаемые представляют параллелизм в Rx, поэтому для перекрывающихся уведомлений требуется две или более наблюдаемых. Уведомления не будут перекрываться в одном и том же наблюдателе, но они будут перекрываться по отношению к каждому наблюдателю.
Например, если вам нужно выполнять два метода (наблюдателя) одновременно, тогда вам нужно определить две наблюдаемые.
Технически, это наблюдатели (подписки), а не наблюдаемые, которые необходимы для параллелизма; следовательно, подписка на одну и ту же холодную наблюдаемую дважды может привести к параллелизму, в зависимости от планировщика, используемого наблюдаемой; однако, подписка на одну и ту же горячую наблюдаемую дважды не приводит к параллелизму. (См. Мое сообщение в блоге: Горячие и холодные наблюдаемые.)
ObserveOn вводит параллелизм при передаче планировщика, представляющего параллелизм. Но как это может сделать это, не нарушая §6.7 контракта? Ну, это делит наблюдаемое на две наблюдаемые: до оператора и после оператора! Кроме того, вы можете посмотреть на него как на две подписки или наблюдателя: до и после. Предыдущий наблюдатель является внутренним наблюдателем, который предоставляет ObserveOn. Наблюдатель после является вашим наблюдателем или наблюдателем, предоставленным следующим оператором в запросе.
Независимо от того, как вы смотрите на это, уведомления в до наблюдаемой могут происходить одновременно с уведомлениями в после наблюдаемой. Но наблюдатель после будет получать сериализованные уведомления только в контексте наблюдаемой после.