RxPy - Как остановить асинхронную наблюдаемую цепочку / работу

Я новичок в ReactiveX, и я хочу сделать что-то простое, но я не могу найти решение.

Я ищу способ остановить асинхронную работу, когда она запланирована в планировщике цикла событий.

Ниже приведен пример, показывающий проблему:

def compute_1(item):
    print 'compute_1 (item {}) starting...'.format(item)
    time.sleep(0.1)
    print 'compute_1 (item {}) finished'.format(item)
    return item


def compute_2(item):
    print 'compute_2 (item {}) starting...'.format(item)
    time.sleep(0.1)
    print 'compute_2 (item {}) finished'.format(item)
    return item


event_loop = EventLoopScheduler()

disposable = Observable.from_([1,2,3])\
    .select(compute_1)\
    .select(compute_2)\
    .subscribe_on(event_loop)\
    .subscribe()

time.sleep(0.3)

print 'disposing...'
disposable.dispose()

time.sleep(5)

Выход:

compute_1 (item 1) starting...
compute_1 (item 1) finished
compute_2 (item 1) starting...
compute_2 (item 1) finished
compute_1 (item 2) starting...
disposing...
compute_1 (item 2) finished
compute_2 (item 2) starting...
compute_2 (item 2) finished
compute_1 (item 3) starting...
compute_1 (item 3) finished
compute_2 (item 3) starting...
compute_2 (item 3) finished

Как видите, даже после утилизации предметы продолжают обрабатываться.

Я ожидаю, что элемент 2 compute_2 и весь элемент 3 не будут выполнены.

Является ли это возможным? Если так, то как правильно это сделать?

0 ответов

Другие вопросы по тегам