Экспоненциальный откат в RxJava

У меня есть API, который принимает Observable это вызывает событие.

Я хочу вернуть Observable который излучает значение каждый defaultDelay секунд, если интернет-соединение обнаружено и задерживается numberOfFailedAttempts^2 раз, если нет связи.

Я пробовал кучу разных стилей, самая большая проблема у меня retryWhen's наблюдаемое оценивается только один раз:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Есть ли способ сделать то, что я пытаюсь сделать? Я нашел связанный вопрос (сейчас не могу найти его в поиске), но выбранный подход, похоже, не работал с динамическим значением.

3 ответа

Решение

В вашем коде есть две ошибки:

  1. Чтобы повторить некоторую наблюдаемую последовательность, эта последовательность должна быть конечной. Т.е. вместо interval вам лучше использовать что-то вроде just, или же fromCallable как я сделал в образце ниже.
  2. От repeatWhenвнутренняя функция, вам нужно вернуть новый задержанный наблюдаемый источник, поэтому вместо observable.delay() ты должен вернуться Observable.timer(),

Рабочий код:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

Смотрите подробную статью о repeatWhen здесь

Я всегда находил retryWhen чтобы быть несколько низкоуровневым, поэтому для экспоненциального отката я использую конструктор (например, Abhijit), который тестируется модулем и доступен для RxJava 1.x в rxjava-extras. Я бы предложил использовать ограниченную версию, чтобы экспоненциальное увеличение задержки не выходило за пределы максимального значения, которое вы определяете.

Вот как вы используете это:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
        delay, maxDelay, TimeUNIT.SECONDS)
    .build());

Я не согласен с этим retryWhen глючит, но если вы найдете ошибку, сообщите об этом в RxJava. Ошибки исправляются быстро!

Вам понадобится rxjava-extras 0.8.0.6 или более поздняя версия, которая находится в Maven Central:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.8.0.6</version>
</dependency>

Дайте мне знать, если вам нужна версия RxJava 2.x. Та же функциональность доступна в rxjava2-extras начиная с 0.1.4.

Вы можете использовать retryWhen Оператор настраивает задержку при отсутствии соединения. Как периодически выбрасывать предметы - это отдельная тема (ищите interval или же timer операторы). Откройте отдельный вопрос, если вы не можете понять это.

У меня есть обширный пример на моем Github, но я дам вам суть здесь.

RetryWithDelay retryWithDelay = RetryWithDelay.builder()
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
    .build()

Single.fromCallable(() -> {
    ...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
    ...
})

RetryWithDelay определяется следующим образом. Я использовал RxJava 2.x, поэтому, если вы используете 1.x, подпись должна быть Func1<Observable<? extends Throwable>, Observable<Object>>,

public class RetryWithDelay implements
        Function<Flowable<? extends Throwable>, Publisher<Object>> {
    ...
}

Класс RetryWithDelay.

RetryStrategy enum.

Это позволяет мне настраивать различные виды таймаутов, постоянные, линейные, экспоненциальные, основанные на RetryDelayStrategy, Для вашего случая использования вы бы выбрали CONSTANT_DELAY_TIMES_RETRY_COUNT Задержка стратегии и вызова retryDelaySeconds(2) при строительстве RetryWithDelay,

retryWhen это сложный, возможно, даже глючный оператор. Большинство примеров, которые вы найдете в Интернете, используют range оператор, который потерпит неудачу, если нет повторных попыток. Смотрите мой ответ здесь для деталей.

Другие вопросы по тегам