Rx Как распараллелить долгосрочную задачу?

У меня есть следующий фрагмент, который перечисляет элементы некоторого XML (читать из вывода svn log --xml ... process) затем запускает длительный метод для каждого элемента xml.

var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

var xElements = xml.Descendants("path")
                   .ToObservable()
                   //.SubscribeOn(ThreadPoolScheduler.Instance) 
                   .Select(descendant => return LongRunning(descendant));
xElements
    //.SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(result => Console.WriteLine(result);

Console.ReadKey();

LongRunning метод не важен, но внутри я записываю поток, на котором он работает. Давайте предположим, что это длится целую секунду.

Моя проблема, не комментируя либо SubscribeOn() линия не имеет никакого эффекта вообще. Призывы к LongRunning являются последовательными и происходят каждую секунду в одном и том же потоке (хотя и отличаются от основного (начального) потока).

Это консольное приложение.

Я новичок в Rx. Что мне не хватает?

РЕДАКТИРОВАТЬ:

Попробовав ответ Ли Кэмпбелла, я заметил еще одну проблему.

Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);

var xElements = xml.Descendants("path").ToObservable()
    //.ObserveOn(Scheduler.CurrentThread)
    .SelectMany(descendant =>     
          Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
    .Subscribe(result => Console.WriteLine(
         "Result on: " + Thread.CurrentThread.ManagedThreadId));

[...]

string LongRunning(XElement el)
{
    Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
    DoWork();
    Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
    return "something";
}

Это дает следующий вывод:

Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...

Мне нужен способ "поставить в очередь" результаты в одном потоке. Я думал вот что ObserveOn() для, но без комментариев ObserveOn() строка выше не меняет результаты.

1 ответ

Решение

Во-первых, Rx - это библиотека (или парадигма) для управления асинхронностью, в частности наблюдаемые последовательности. Здесь у вас есть перечислимая последовательность (потомки Xml) и блокирующая / синхронная LongRunning вызов метода.

По телефону ToObservable() в вашей перечисляемой последовательности вы действительно соблюдаете только интерфейс, но, поскольку ваша последовательность реализована (нетерпеливо, не лениво), в этом нет ничего реально наблюдаемого / асинхронного.

По телефону SubscribeOn, у вас была правильная идея, но преобразование было сделано уже в ToObservable() оператор. Вы, вероятно, хотели позвонить ToObservable(ThreadPoolScheduler.Instance) так что любая медленная итерация IEnumerable можно сделать на другом потоке. Однако... Я думаю, что это не будет медленный итератор, так что, вероятно, это ничего не решит.

Скорее всего, вы хотите сделать (что сомнительно, если Rx - лучший инструмент для решения этой проблемы), чтобы запланировать вызов LongRunning метод. Однако это означает, что вам нужно будет добавить Asyncrony к вашему выбору. Отличным способом сделать это является один из методов Rx Factory, таких как Observable.FromAsync или же Observable.Start, Это, однако, сделает вашу последовательность IObservable<IObservable<T>>, Вы можете сгладить это с помощью SelectMany или же Merge,

Сказав все это, я думаю, вы хотите сделать следующее:

var proc = Process.Start(avnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

//EDIT: Added ELS to serialise results onto a single thread.
var els = new EventLoopScheduler(threadStart=>new Thread(threadStart)
    {
        IsBackground=true, 
        Name="MyEventLoopSchedulerThread"
    });

var xElements = xml.Descendants("path").ToObservable()
                .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance))
                .ObserveOn(els)
                .Subscribe(result => Console.WriteLine(result));

Console.ReadKey();
Другие вопросы по тегам