RxJava: Как поддерживать Observable живым даже после получения ошибки в onError() или ReSubscribe той же Observable

На самом деле я создал конфигурацию типа RxSearch. В котором я прикрепил EditText textChangeListener с PublishSubject. Использование событий для отправки символов в Observable, который используется в качестве входных данных для вызова API модернизации.

проблема

Единственная проблема, с которой я сталкиваюсь - это когда-нибудь я получаю сообщение об ошибке API "неожиданный конец потока" внутри обратного вызова onError() из наблюдаемого. Как только я получил ошибку, Observable перестает работать. Наблюдаемый закрывается, не в состоянии получить символы из onNext() PublishSubject.

Посмотрите на RxSearchObservable

class RxSearchObservable {
companion object {
    fun fromView(editText: EditText): Observable<String> {
        val subject = PublishSubject.create<String>()
        editText.addTextChangedListener(object : TextWatcher {
            override fun afterTextChanged(s: Editable?) {
                //subject.onComplete()
            }

            override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
                //subject.onNext(s.toString())
            }

            override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
                if (s.isNotEmpty()) subject.onNext(s.toString())
            }
        })
        return subject
    }
}
}

Как я подписался и сделал Retrofit API вызов в сторону SwitchMap.

 RxSearchObservable.fromView(edtToolSearch)
                    .debounce(700, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .retryWhen { t -> t.delay(3, TimeUnit.SECONDS) }
                    .switchMap { searchTerm ->
                        runOnUiThread { progressBar.visibility = View.VISIBLE }
                        apiManager.getSearchUnits(searchTerm)
                    }
                    .onErrorResumeNext(Observable.empty())
                    .subscribe({ response ->
                        Log.i("Called subscribe", ":::::::::::+++++++++++++++ GONE")
                        progressBar.visibility = View.GONE
                        if (response.isSuccessful) {
                            val units = response.body()
                            val searchedDatasets = units?.dataset
                            if (searchedDatasets?.size!! > 0) {
                                val searchAdapter = SearchAdapter(this@MapActivity, searchedDatasets, false)
                                listSearch.visibility = View.VISIBLE
                                listSearch.adapter = searchAdapter
                            } else {
                                toast("No items found !!!")
                            }
                        } else {
                            apiError = ErrorUtils.parseError(response)
                            toast(apiError.msg)
                        }
                    }, { t: Throwable? ->
                        progressBar.visibility = View.GONE
                        toast(t?.message.toString())
                    }))

Любая идея, помощь, предложение будет оценено. Заранее спасибо.

1 ответ

Поток, ошибки которого прекращаются. Вы можете retry() подписка, но это должно быть сделано только условно. Может быть, с таймаутом, может быть, только несколько раз, может быть, только на определенных ошибках.

В вашем случае вы должны рассмотреть обработку ошибки вызова API в пределах switchMap, Таким образом, ошибка не достигает основного потока.

.switchMap { searchTerm ->
    runOnUiThread { progressBar.visibility = View.VISIBLE }
    apiManager.getSearchUnits(searchTerm)
          .onErrorResumeNext(Observable.empty())
}
Другие вопросы по тегам