RxPy - Как остановить асинхронную наблюдаемую цепочку / работу
Я новичок в ReactiveX, и я хочу сделать что-то простое, но я не могу найти решение.
Я ищу способ остановить асинхронную работу, когда она запланирована в планировщике цикла событий.
Ниже приведен пример, показывающий проблему:
def compute_1(item):
print 'compute_1 (item {}) starting...'.format(item)
time.sleep(0.1)
print 'compute_1 (item {}) finished'.format(item)
return item
def compute_2(item):
print 'compute_2 (item {}) starting...'.format(item)
time.sleep(0.1)
print 'compute_2 (item {}) finished'.format(item)
return item
event_loop = EventLoopScheduler()
disposable = Observable.from_([1,2,3])\
.select(compute_1)\
.select(compute_2)\
.subscribe_on(event_loop)\
.subscribe()
time.sleep(0.3)
print 'disposing...'
disposable.dispose()
time.sleep(5)
Выход:
compute_1 (item 1) starting...
compute_1 (item 1) finished
compute_2 (item 1) starting...
compute_2 (item 1) finished
compute_1 (item 2) starting...
disposing...
compute_1 (item 2) finished
compute_2 (item 2) starting...
compute_2 (item 2) finished
compute_1 (item 3) starting...
compute_1 (item 3) finished
compute_2 (item 3) starting...
compute_2 (item 3) finished
Как видите, даже после утилизации предметы продолжают обрабатываться.
Я ожидаю, что элемент 2 compute_2 и весь элемент 3 не будут выполнены.
Является ли это возможным? Если так, то как правильно это сделать?