OnErrorNotImplementedException с использованием RxJava2 и Retrofit2 Mosby MVI

Я получаю OnErrorNotImplementedException и приложение вылетает, несмотря на обработку ошибки в нисходящем направлении (?).

исключение

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: pl.netlandgroup.smartsab, PID: 9920
io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
    at io.reactivex.Observable.subscribe(Observable.java:10838)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)
 Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) 
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 
    at java.lang.Thread.run(Thread.java:761) 

Модернизация хранилища:

class RetrofitRepository (retrofit: Retrofit) {

    val apiService: ApiService = retrofit.create(ApiService::class.java)
    var size: Int = 0

    fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> {
        return apiService.getMapResponse(pageIndex = pageIndex)
                .doOnError { Log.d("error", it.message) }
                .doOnNext { Log.d("currThread", Thread.currentThread().name) }

    }

    fun getItemsFormResponses(): Observable<List<Item>> {
        val list = mutableListOf<Observable<List<Item>>>()
        val resp0 = getMapResponse()
        resp0.subscribe { size = it.totalCount }
        var accum = 0
        do {
            list.add(getMapResponse(accum).map { it.items })
            accum++
        } while (list.size*200 < size)
        return Observable.merge(list)
    }
}

Эти результаты соблюдаются Interactor:

class MapInteractor @Inject constructor(private val repository: RetrofitRepository) {

    fun getMapItems(): Observable<MapViewState> {
        return repository.getItemsFormResponses()
                .map {
                    if(it.isEmpty()) {
                        return@map MapViewState.EmptyResult()
                    } else {
                        val mapItems = it.map { it.toMapItem() }
                        return@map MapViewState.MapResult(mapItems)
                    }
                }
                .doOnNext { Log.d("currThread", Thread.currentThread().name) }
                .startWith(MapViewState.Loading())
                .onErrorReturn { MapViewState.Error(it) }
    }
}

onErrorReturn { MapViewState.Error(it) } испускает правильно (прямо перед падением приложения я вижу правильную вещь, отображаемую на экране). Как я могу избежать этого исключения при сохранении архитектуры MVI?

РЕДАКТИРОВАТЬ

Ответ, предоставленный dimsuz, был правильным решением, хотя для достижения слияния и возврата одного наблюдаемого со всеми элементами его необходимо было изменить следующим образом:

fun getMapItems(): Observable<List<Item>> {
    return getMapResponse().map {
        val size = it.totalCount
        val list = mutableListOf<Observable<List<Item>>>()
        var accum = 0
        do {
            list.add(getMapResponse(accum++).map { it.items })
        } while (list.size*200 < size)
        return@map list.zip { it.flatten() }
    }.mergeAll()
}

1 ответ

Решение

Я считаю, что ошибка брошена в getItemsFromResponse() вдоль линий:

    val resp0 = getMapResponse()
    resp0.subscribe { size = it.totalCount }

Здесь вы подписываетесь, но не обрабатываете ошибку. На самом деле этот код неверен, потому что вы разбиваете цепочку Rx на две независимые части, вы не должны этого делать.

Что вы должны сделать, это что-то вроде этого:

fun getItemsFormResponses(): Observable<List<Item>> {
  return getMapResponse().map { resp0 ->
    val size = resp0.totalCount

    val list = mutableListOf<Observable<List<Item>>>()
    var accum = 0
    do {
      list.add(getMapResponse(accum).map { it.items })
      accum++
    } while (list.size*200 < size)
    return Observable.merge(list)
  }
}

Т.е. экстракт size расширяя цепь оператором, а не разрывая ее subscribe(),

Решение уже было показано в предыдущем ответе, однако точная причина, по которой вы получаете "OnErrorNotImplementedException", заключается в том, что вы, вероятно, подписываетесь только на "onSuccess" Consumer.

Вам следует добавить еще одного потребителя, который будет обрабатывать события ошибок.

getMapResponse()
            .subscribe ( 
                     { size = it.totalCount },
                     {/* Do something with the error*/}
            )

doOnError не помешает вам получать исключения.

Другие вопросы по тегам