subscribe_on с from_iterable/range в RxPY
Я пытаюсь разобраться в планировании реактивных расширений для python. Я хотел бы использовать subscribe_on
обрабатывать несколько наблюдаемых параллельно. Это прекрасно работает, если наблюдаемая создается с just
но не если например range
или же from_
используются.
just
по умолчанию Scheduler.immediate
в то время как другие генераторы по умолчанию Scheduler.current_thread
, Что вызывает разницу, но мне кажется, что это противоречит мне. Возможно, потому что я не понимаю всей проблемы.
Рассмотрим следующий пример:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
Работает с observe_on
или если планировщик передается непосредственно генератору, но я бы хотел отделить наблюдаемое создание от обработки и добиться чего-то вроде этого:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
Я недоразумение subscribe_on
? Мой подход неверен?
1 ответ
В вашем примере есть три действия, которые можно запланировать независимо:
Действие подачи данных.
just
а такжеrange
по умолчанию используются разные планировщики, но между ними нет большой разницы. Оба вводят начальные значения в текущем потоке. Вы можете переопределить их планировщики по умолчанию на все, что пожелаете, передав их в качестве параметра этим методам.Подпишись на акцию. Пользы
Scheduler.current_thread
по умолчанию. Т.е. он выполняется в том же потоке, что и действие подачи данных. Может быть отмененоsubscribe_on
метод.Соблюдайте (
on_next
,on_error
,on_completed
) действие. ПользыScheduler.current_thread
по умолчанию. Т.е. он выполняется в том же потоке, что и действие подписки. Может быть отмененоobserve_on
метод.
Если вы переопределяете планировщик только для одного из этих действий, остальные должны следовать, как описано выше.
О планировщиках
Scheduler.immediate
на самом деле ничего не график. Это немедленно вызывает действие в том же потоке, где это было запланировано.
Scheduler.current_thread
избегает рекурсии путем постановки в очередь действий, но все же вызывает действие в том же потоке, где это было запланировано.
Scheduler.new_thread
запускает один фоновый поток для выполнения действий одно за другим.
Scheduler.timeout
запускает новый фоновый поток для каждого действия, которое необходимо выполнить.
Попытка параллельной обработки
Наиболее подходящим методом для планирования работы в разных потоках является observe_on
,
Проблема в том, что нет thread_pool
Планировщик в RxPy прямо сейчас. new_thread
Планировщик запускает только один поток, так что это вам не сильно поможет.
timeout
Планировщик можно использовать для параллельной работы, но он не дает контроля над количеством одновременных потоков, поэтому взрывной рост числа одновременных задач может привести к переполнению памяти и эффективному отказу вашей системы.
НЕ ошибка в наблюдении
Я попытался запустить ваш пример с observe_on(Scheduler.timeout)
но задачи все равно не шли параллельно. Изучив источник RxPy, я обнаружил, что он планирует следующее событие только после завершения текущего события, что фактически отключает параллельную обработку. Моей первой реакцией было сообщение об ошибке в observe_on
реализация.
Но после дальнейшего изучения я обнаружил, что последовательное выполнение - это не ошибка, а скорее предполагаемое поведение.
Правильный способ выполнять задачи параллельно
Вот код, который работает (на основе этого ответа):
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(finish)
Observable.start
создает асинхронную наблюдаемую, которая запланирована в отдельном потоке через Scheduler.timeout
,
observe_on(Scheduler.event_loop)
не является обязательным. Это заставляет finish
метод для всех элементов, вызываемых в одном потоке.
Обратите внимание, что нет никакой гарантии, что finish
метод вызывается в начальном range
порядок.