Интервал rxjava в сочетании с другим наблюдаемым

У меня есть наблюдаемое (что горячо), которое делает что-то через системный процесс, и я бы хотел, чтобы интервал также работал, пока наблюдаемый процесс не достигнет полного завершения.

Я вижу оператор интервала: http://reactivex.io/documentation/operators/interval.html

Как я могу объединить эти два, чтобы получить желаемое поведение (в частности, отменить интервал, когда другие нажимают на Complete)?

1 ответ

Ты можешь использовать takeUntil() оператор для отмены (отписки) оператора интервала, takeUntil() принимает Observable в качестве ввода и отмены при вводе Observable испускает предмет

Осталось преобразовать горячее Observable для Observable которые испускают предмет с его onComplete()так что мы можем использовать его как вход для takeUntil(), это возможно с помощью materialize() оператор, который излучает Notification объект для каждого Observable событие (onNext(), onError(), onCompleted()), в сочетании с filter() взять только onCompleted() События.

Observable<Notification<Object>> hotOnCompleteObservable = 
            hot.materialize()
               .filter(notification -> notification.isOnCompleted());

Observable interval = ...
interval.takeUntil(hotOnCompleteObservable);
Другие вопросы по тегам