В RxJava, как отразить / извлечь ошибку за пределами наблюдаемой?
У нас есть StoreService, который вызывает метод update(key, content), использующий клиент couchbase для получения get ->change_content->replace.
В рамках этого процесса мы используем Obserable retryWhen для обработки исключений. В случае, если повтор превышает максимальное количество повторов, он просто передает исключение, которое затем вызывает метод onError наблюдателя.
То, что мы хотели бы сделать в случае, если Ошибка не может быть обработана, - это выбросить Исключение из метода update(key, content) в StoreService, который его вызывает, но мы не смогли этого сделать.
До сих пор мы безуспешно пробовали следующие способы:
- Вызовите исключение из метода onError, но оно не будет выброшено из Observable.
- Создайте исключение RuntimeException, но оно не сработало.
- Используя DTO, в котором есть логический член isFailed: мы создаем DTO вне Observable, и в случае ошибки мы получаем ошибку onError подписчика, где мы устанавливаем для istoAiled DTO значение true. После того, как Observable заканчивается, мы проверяем, является ли DTO isFailed, и если это так, мы генерируем исключение. Это тоже не сработало - изменения, произошедшие в onError, не распространялись за пределы Observable (почему?)
Вот псевдокод:
public void update(String key, ConversationDto updateConversationDto) {
ObservableExecution observableExecution = new ObservableExecution();
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
observableExecution.setFailure(e);
},
() -> logger.debug("on complete update document")
);
// this is never true
if (observableExecution.isFailed()) {
final Throwable e = observableExecution.getFailure();
throw new DalRuntimeException(e.getMessage(), e);
}
}
Это повторный код, когда:
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable errorNotification) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
logger.debug(errorNotification + " retry no. " + retryCount);
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
logger.debug(errorNotification + " exceeded max retries " + maxRetries);
return Observable.error(errorNotification);
}
});
}
Очень ценю вашу помощь!
2 ответа
Подписка выполняется асинхронно, поэтому isFailed()
проверка всегда запускается немедленно и до e -> setFailure(e)
код работает.
Правильный способ сделать это - вернуть Observable
от update()
метод и подписаться на него в StoreService
, Таким образом, вы будете уведомлены об успехах и неудачах, если вы заинтересованы в их обработке.
Я согласен с @Ross: концептуально Observable должен быть возвращен update(). Единственное упрощение, которое я могу предложить, - это использование локальной изменяемой переменной вместо ObservableExecution DTO:
public void update(String key, ConversationDto updateConversationDto) {
final Throwable[] errorHolder = new Throwable[1];
Observable
.defer(... get the document from couchbase ...)
.map(... handle JSON conversion and update the document ...)
.flatMap(documentUpdate -> {
return couchbaseClient.getAsyncBucket().replace(documentUpdate);
})
.retryWhen(new RetryWithDelay(3, 200))
.subscribe(
n -> logger.debug("on next update document -> " + n.content()),
e -> {
//logger.error("failed to insert a document",e);
errorHolder[0] = e;
},
() -> logger.debug("on complete update document")
);
if (errorHolder[0] != null) {
final Throwable e = errorHolder[0];
throw new DalRuntimeException(e.getMessage(), e);
}
}