В RxJava, как отразить / извлечь ошибку за пределами наблюдаемой?

У нас есть StoreService, который вызывает метод update(key, content), использующий клиент couchbase для получения get ->change_content->replace.

В рамках этого процесса мы используем Obserable retryWhen для обработки исключений. В случае, если повтор превышает максимальное количество повторов, он просто передает исключение, которое затем вызывает метод onError наблюдателя.

То, что мы хотели бы сделать в случае, если Ошибка не может быть обработана, - это выбросить Исключение из метода update(key, content) в StoreService, который его вызывает, но мы не смогли этого сделать.

До сих пор мы безуспешно пробовали следующие способы:

  1. Вызовите исключение из метода onError, но оно не будет выброшено из Observable.
  2. Создайте исключение RuntimeException, но оно не сработало.
  3. Используя DTO, в котором есть логический член isFailed: мы создаем DTO вне Observable, и в случае ошибки мы получаем ошибку onError подписчика, где мы устанавливаем для istoAiled DTO значение true. После того, как Observable заканчивается, мы проверяем, является ли DTO isFailed, и если это так, мы генерируем исключение. Это тоже не сработало - изменения, произошедшие в onError, не распространялись за пределы Observable (почему?)

Вот псевдокод:

 public void update(String key, ConversationDto updateConversationDto) {

    ObservableExecution observableExecution = new ObservableExecution();

    Observable
            .defer(... get the document from couchbase ...) 
            .map(... handle JSON conversion and update the document ...)
            .flatMap(documentUpdate -> {
                return couchbaseClient.getAsyncBucket().replace(documentUpdate);
            })
            .retryWhen(new RetryWithDelay(3, 200))
            .subscribe(
                    n -> logger.debug("on next update document -> " + n.content()),
                    e -> {
                        //logger.error("failed to insert a document",e);
                        observableExecution.setFailure(e);
                    },
                    () -> logger.debug("on complete update document")

            );
    // this is never true
    if (observableExecution.isFailed()) {
        final Throwable e = observableExecution.getFailure();
        throw new DalRuntimeException(e.getMessage(), e);
    }
}

Это повторный код, когда:

public Observable<?> call(Observable<? extends Throwable> attempts) {
    return attempts
            .flatMap(new Func1<Throwable, Observable<?>>() {
                @Override
                public Observable<?> call(Throwable errorNotification) {
                    if (++retryCount < maxRetries) {
                        // When this Observable calls onNext, the original
                        // Observable will be retried (i.e. re-subscribed).
                        logger.debug(errorNotification + " retry no. " + retryCount);
                        return Observable.timer(retryDelayMillis,
                                TimeUnit.MILLISECONDS);
                    }

                    // Max retries hit. Just pass the error along.
                    logger.debug(errorNotification + " exceeded max retries " + maxRetries);
                    return Observable.error(errorNotification);
                }
            });
}

Очень ценю вашу помощь!

2 ответа

Решение

Подписка выполняется асинхронно, поэтому isFailed() проверка всегда запускается немедленно и до e -> setFailure(e) код работает.

Правильный способ сделать это - вернуть Observable от update() метод и подписаться на него в StoreService, Таким образом, вы будете уведомлены об успехах и неудачах, если вы заинтересованы в их обработке.

Я согласен с @Ross: концептуально Observable должен быть возвращен update(). Единственное упрощение, которое я могу предложить, - это использование локальной изменяемой переменной вместо ObservableExecution DTO:

public void update(String key, ConversationDto updateConversationDto) {
    final Throwable[] errorHolder = new Throwable[1];

    Observable
        .defer(... get the document from couchbase ...) 
        .map(... handle JSON conversion and update the document ...)
        .flatMap(documentUpdate -> {
            return couchbaseClient.getAsyncBucket().replace(documentUpdate);
        })
        .retryWhen(new RetryWithDelay(3, 200))
        .subscribe(
                n -> logger.debug("on next update document -> " + n.content()),
                e -> {
                    //logger.error("failed to insert a document",e);
                    errorHolder[0] = e;
                },
                () -> logger.debug("on complete update document")

        );

    if (errorHolder[0] != null) {
        final Throwable e = errorHolder[0];
        throw new DalRuntimeException(e.getMessage(), e);
    }
}
Другие вопросы по тегам