RxJava 2: Retry Completable при отправке уведомлений о повторных попытках подписчикам
Я новичок в RxJava 2 и хочу повторить Completable
вызов сервера API до успешного завершения при отправке уведомлений о попытках повторных попыток, чтобы мой пользовательский интерфейс мог отображать состояние повторных попыток для пользователя.
Что-то вроде этого:
public Observable<RetryAttempt> retryServerCall() {
// execute Completable serverCall()
// if an error is thrown, emit new RetryAttempt(++retryCount, error) to subscriber
// retry until serverCall() is successful
}
public Completable serverCall();
public class RetryAttempt {
public RetryAttempt(int retryCount, Throwable cause);
}
Я попробовал несколько разных подходов и столкнулся с препятствиями. Наиболее близок этот подход, создавая вмещающую Observable и явно вызывая onNext() / onComplete() / onError().
public Observable<RetryAttempt> retryServerCall() {
final int[] retryCount = {0};
return Observable.create(e ->
serverCall()
.doOnError(throwable -> e.onNext(new RequestHelp.RetryAttempt(++retryCount[0], throwable)))
.retry()
.subscribe(() -> e.onComplete(), throwable -> e.onError(throwable)));
}
Возможно, это немного второстепенный вопрос, но я должен был использовать final
массив для retryCount
во избежание ошибки variable used in lambda should be final or effectively final
,
Я знаю, что должно быть лучше сделать это с помощью Rx Voodoo. Любое руководство высоко ценится!
1 ответ
public Single<List<Farmer>> getAllFarmers(long timestamp) {
return Observable.fromCallable(() -> mapiFactory.getAllFarmerAboveTime(timestamp))
.doOnError(throwable -> Log.d(TAG, "Error calling getAllFarmers: "+throwable.getMessage()))
.retryWhen(new RetryWithDelay(5,1000))
.concatMap(farmersResponse -> Observable.fromIterable(farmersResponse.farmer))
.filter(farmer -> !StringUtils.isBlank(farmer.cnic))
.map(this::validateCnic)
.distinct(farmer -> farmer.cnic)
.toList();
}
когда метод fromCallable() выдает исключение.retryWhen(new RetryWithDelay(5,1000)) получит выполнение здесь, мы повторяем API 5 раз с экспоненциальной задержкой, начиная с 1000
а вот RetryWithDelay
public class RetryWithDelay implements Function<Observable<Throwable>,
Observable<?>> {
private final int _maxRetries;
private final int _retryDelayMillis;
private int _retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
_maxRetries = maxRetries;
_retryDelayMillis = retryDelayMillis;
_retryCount = 0;
}
@Override
public Observable<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (++_retryCount < _maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed)
Log.d(TAG, String.format("Retrying in %d ms", _retryCount * _retryDelayMillis));
return Observable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS);
}
// Max retries hit. Pass an error so the chain is forcibly completed
// only onNext triggers a re-subscription (onError + onComplete kills it)
return Observable.error(throwable);
}
});
}
}