subscribe_on с from_iterable/range в RxPY

Я пытаюсь разобраться в планировании реактивных расширений для python. Я хотел бы использовать subscribe_on обрабатывать несколько наблюдаемых параллельно. Это прекрасно работает, если наблюдаемая создается с justно не если например range или же from_ используются.

just по умолчанию Scheduler.immediateв то время как другие генераторы по умолчанию Scheduler.current_thread, Что вызывает разницу, но мне кажется, что это противоречит мне. Возможно, потому что я не понимаю всей проблемы.
Рассмотрим следующий пример:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

Работает с observe_on или если планировщик передается непосредственно генератору, но я бы хотел отделить наблюдаемое создание от обработки и добиться чего-то вроде этого:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

Я недоразумение subscribe_on? Мой подход неверен?

1 ответ

Решение

В вашем примере есть три действия, которые можно запланировать независимо:

  1. Действие подачи данных. just а также range по умолчанию используются разные планировщики, но между ними нет большой разницы. Оба вводят начальные значения в текущем потоке. Вы можете переопределить их планировщики по умолчанию на все, что пожелаете, передав их в качестве параметра этим методам.

  2. Подпишись на акцию. Пользы Scheduler.current_thread по умолчанию. Т.е. он выполняется в том же потоке, что и действие подачи данных. Может быть отменено subscribe_on метод.

  3. Соблюдайте (on_next, on_error, on_completed) действие. Пользы Scheduler.current_thread по умолчанию. Т.е. он выполняется в том же потоке, что и действие подписки. Может быть отменено observe_on метод.

Если вы переопределяете планировщик только для одного из этих действий, остальные должны следовать, как описано выше.

О планировщиках

Scheduler.immediate на самом деле ничего не график. Это немедленно вызывает действие в том же потоке, где это было запланировано.

Scheduler.current_thread избегает рекурсии путем постановки в очередь действий, но все же вызывает действие в том же потоке, где это было запланировано.

Scheduler.new_thread запускает один фоновый поток для выполнения действий одно за другим.

Scheduler.timeout запускает новый фоновый поток для каждого действия, которое необходимо выполнить.

Попытка параллельной обработки

Наиболее подходящим методом для планирования работы в разных потоках является observe_on,

Проблема в том, что нет thread_pool Планировщик в RxPy прямо сейчас. new_thread Планировщик запускает только один поток, так что это вам не сильно поможет.

timeout Планировщик можно использовать для параллельной работы, но он не дает контроля над количеством одновременных потоков, поэтому взрывной рост числа одновременных задач может привести к переполнению памяти и эффективному отказу вашей системы.

НЕ ошибка в наблюдении

Я попытался запустить ваш пример с observe_on(Scheduler.timeout) но задачи все равно не шли параллельно. Изучив источник RxPy, я обнаружил, что он планирует следующее событие только после завершения текущего события, что фактически отключает параллельную обработку. Моей первой реакцией было сообщение об ошибке в observe_on реализация.

Но после дальнейшего изучения я обнаружил, что последовательное выполнение - это не ошибка, а скорее предполагаемое поведение.

Правильный способ выполнять задачи параллельно

Вот код, который работает (на основе этого ответа):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)

Observable.start создает асинхронную наблюдаемую, которая запланирована в отдельном потоке через Scheduler.timeout,

observe_on(Scheduler.event_loop) не является обязательным. Это заставляет finish метод для всех элементов, вызываемых в одном потоке.

Обратите внимание, что нет никакой гарантии, что finish метод вызывается в начальном range порядок.

Другие вопросы по тегам