Экспоненциальный откат в RxJava
У меня есть API, который принимает Observable
это вызывает событие.
Я хочу вернуть Observable
который излучает значение каждый defaultDelay
секунд, если интернет-соединение обнаружено и задерживается numberOfFailedAttempts^2
раз, если нет связи.
Я пробовал кучу разных стилей, самая большая проблема у меня retryWhen's
наблюдаемое оценивается только один раз:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
Есть ли способ сделать то, что я пытаюсь сделать? Я нашел связанный вопрос (сейчас не могу найти его в поиске), но выбранный подход, похоже, не работал с динамическим значением.
3 ответа
В вашем коде есть две ошибки:
- Чтобы повторить некоторую наблюдаемую последовательность, эта последовательность должна быть конечной. Т.е. вместо
interval
вам лучше использовать что-то вродеjust
, или жеfromCallable
как я сделал в образце ниже. - От
repeatWhen
внутренняя функция, вам нужно вернуть новый задержанный наблюдаемый источник, поэтому вместоobservable.delay()
ты должен вернутьсяObservable.timer()
,
Рабочий код:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
Смотрите подробную статью о repeatWhen
здесь
Я всегда находил retryWhen
чтобы быть несколько низкоуровневым, поэтому для экспоненциального отката я использую конструктор (например, Abhijit), который тестируется модулем и доступен для RxJava 1.x в rxjava-extras. Я бы предложил использовать ограниченную версию, чтобы экспоненциальное увеличение задержки не выходило за пределы максимального значения, которое вы определяете.
Вот как вы используете это:
observable.retryWhen(
RetryWhen.exponentialBackoff(
delay, maxDelay, TimeUNIT.SECONDS)
.build());
Я не согласен с этим retryWhen
глючит, но если вы найдете ошибку, сообщите об этом в RxJava. Ошибки исправляются быстро!
Вам понадобится rxjava-extras 0.8.0.6 или более поздняя версия, которая находится в Maven Central:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.8.0.6</version>
</dependency>
Дайте мне знать, если вам нужна версия RxJava 2.x. Та же функциональность доступна в rxjava2-extras начиная с 0.1.4.
Вы можете использовать retryWhen
Оператор настраивает задержку при отсутствии соединения. Как периодически выбрасывать предметы - это отдельная тема (ищите interval
или же timer
операторы). Откройте отдельный вопрос, если вы не можете понять это.
У меня есть обширный пример на моем Github, но я дам вам суть здесь.
RetryWithDelay retryWithDelay = RetryWithDelay.builder()
.retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
.build()
Single.fromCallable(() -> {
...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
...
})
RetryWithDelay
определяется следующим образом. Я использовал RxJava 2.x, поэтому, если вы используете 1.x, подпись должна быть Func1<Observable<? extends Throwable>, Observable<Object>>
,
public class RetryWithDelay implements
Function<Flowable<? extends Throwable>, Publisher<Object>> {
...
}
Класс RetryWithDelay.
RetryStrategy enum.
Это позволяет мне настраивать различные виды таймаутов, постоянные, линейные, экспоненциальные, основанные на RetryDelayStrategy
, Для вашего случая использования вы бы выбрали CONSTANT_DELAY_TIMES_RETRY_COUNT
Задержка стратегии и вызова retryDelaySeconds(2)
при строительстве RetryWithDelay
,
retryWhen
это сложный, возможно, даже глючный оператор. Большинство примеров, которые вы найдете в Интернете, используют range
оператор, который потерпит неудачу, если нет повторных попыток. Смотрите мой ответ здесь для деталей.