Как молча пропустить исключения в RxJava2?

У меня есть поток данных, как это:

Observable
    .fromFuture(
        CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>>
            listOf(1, 2, 3, 57005, 5)
        },
        Schedulers.computation()
    )
    .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one
    .map {
        CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred.
            if (it == 0xDEAD) {
                throw IOException("Dead value!")
            }

            it
        }
    }
    .flatMap {
        Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again
    }
    .doOnNext {
        println(it) // Debug
    }
    .blockingSubscribe()

Я заменил бизнес-логику (которая на самом деле возвращает Futureс) с CompletableFuture.supplyAsync, И, да, это Котлин, но я думаю, у тебя есть намерение.

Когда я комментирую "мертвое" значение (57005) вывод:

1
4
9
25

Но если это "мертвое" значение появляется в потоке, оно терпит неудачу:

1
4
9
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86)
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5035)
    at by.dev.madhead.rx.TestKt.main(test.kt:41)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
...

Я новичок в RX, поэтому я быстро нашел решение: onExceptionResumeNext: Observable.fromFuture(it) -> Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }, Но теперь мое приложение зависает навсегда (после получения ожидаемого результата). Похоже, поток никогда не заканчивается.

Должен ли я "отключить" это Observable как-то или как? Или, в целом, это хороший подход при работе с RX? Должен ли я переосмыслить это по-другому?

1 ответ

Решение

Глотайте исключения, как это:

 Observable.fromFuture(it).onErrorResumeNext(Observable.empty())
Другие вопросы по тегам