Как обрабатывать исключения, выданные onNext наблюдателя в RxJava?
Рассмотрим следующий пример:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Это выводит числа от 1 до 5, а затем печатает исключение.
Я хочу добиться того, чтобы наблюдатель оставался подписанным и продолжал работать после выдачи исключения, то есть печатал все числа от 1 до 10.
Я пытался использовать retry()
и другие различные операторы обработки ошибок, но, как сказано в документации, их целью является обработка ошибок, создаваемых самой наблюдаемой.
Самое простое решение - просто обернуть все тело onNext
в блок try-catch, но это не похоже на хорошее решение для меня. В аналогичном вопросе Rx.NET предлагаемое решение состояло в том, чтобы создать метод расширения, который бы выполнял упаковку, создавая наблюдаемый прокси. Я попытался переделать это:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Это ничего не меняет, потому что RxJava сам заключает подписчика в SafeSubscriber
, С помощью unsafeSubscribe
обойти это тоже не кажется хорошим решением.
Что я могу сделать, чтобы решить эту проблему?
1 ответ
Это общий вопрос, который возникает при изучении Rx.
TL; DR
Ваше предложение разместить логику обработки исключений в подписчике предпочтительнее создания общей наблюдаемой оболочки.
объяснение
Помните, что Rx - это передача событий подписчикам.
Из наблюдаемого интерфейса становится ясно, что на самом деле ничего не может знать наблюдатель о своих подписчиках, кроме того, сколько времени они потратили на обработку события или информацию, содержащуюся в любых выданных исключениях.
Общая оболочка для обработки исключений подписчика и продолжения отправки событий этому подписчику - плохая идея.
Зачем? Хорошо, наблюдаемый должен действительно знать, что подписчик сейчас находится в неизвестном состоянии отказа. Продолжать отправку событий в этой ситуации неразумно - возможно, например, подписчик находится в состоянии, когда каждое событие с этого момента будет вызывать исключение и потребуется некоторое время, чтобы сделать это.
После того, как подписчик выдал исключение, существует только два жизнеспособных варианта действий для наблюдаемого:
- Перебрось исключение
- Реализуйте общую обработку, чтобы регистрировать сбой и прекращать отправлять ему события (любого рода), очищать любые ресурсы, связанные с этим подписчиком, и продолжать работу с оставшимися подписками.
Специальная обработка исключений подписчика была бы плохим выбором дизайна; это создаст ненадлежащую поведенческую связь между подписчиком и наблюдаемым. Так что, если вы хотите быть устойчивым к плохим подписчикам, два вышеупомянутых варианта действительно являются разумным пределом ответственности самой наблюдаемой.
Если вы хотите, чтобы ваш подписчик был отказоустойчивым и продолжал работу, то вам следует полностью обернуть его в логику обработки исключений, предназначенную для обработки определенных исключений, из которых вы знаете, как восстанавливаться (и, возможно, для обработки переходных исключений, ведения журнала, логики повторов, обрыва цепи и т. Д..).
Только сам подписчик будет иметь контекст, чтобы понять, подходит ли он для получения дальнейших событий перед лицом отказа.
Если ваша ситуация требует разработки многократно используемой логики обработки ошибок, помните о том, что нужно обернуть обработчики событий наблюдателя, а не наблюдаемые, и позаботиться о том, чтобы не слепо передавать передаваемые события перед лицом отказа. Выпусти это! В то время как не написано о Rx, это интересная классика по разработке программного обеспечения, которая может многое сказать по этому последнему пункту. Если вы не читали это, я очень советую.