Как обрабатывать исключения, выданные onNext наблюдателя в RxJava?

Рассмотрим следующий пример:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

Это выводит числа от 1 до 5, а затем печатает исключение.

Я хочу добиться того, чтобы наблюдатель оставался подписанным и продолжал работать после выдачи исключения, то есть печатал все числа от 1 до 10.

Я пытался использовать retry() и другие различные операторы обработки ошибок, но, как сказано в документации, их целью является обработка ошибок, создаваемых самой наблюдаемой.

Самое простое решение - просто обернуть все тело onNext в блок try-catch, но это не похоже на хорошее решение для меня. В аналогичном вопросе Rx.NET предлагаемое решение состояло в том, чтобы создать метод расширения, который бы выполнял упаковку, создавая наблюдаемый прокси. Я попытался переделать это:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

Это ничего не меняет, потому что RxJava сам заключает подписчика в SafeSubscriber, С помощью unsafeSubscribe обойти это тоже не кажется хорошим решением.

Что я могу сделать, чтобы решить эту проблему?

1 ответ

Решение

Это общий вопрос, который возникает при изучении Rx.

TL; DR

Ваше предложение разместить логику обработки исключений в подписчике предпочтительнее создания общей наблюдаемой оболочки.

объяснение

Помните, что Rx - это передача событий подписчикам.

Из наблюдаемого интерфейса становится ясно, что на самом деле ничего не может знать наблюдатель о своих подписчиках, кроме того, сколько времени они потратили на обработку события или информацию, содержащуюся в любых выданных исключениях.

Общая оболочка для обработки исключений подписчика и продолжения отправки событий этому подписчику - плохая идея.

Зачем? Хорошо, наблюдаемый должен действительно знать, что подписчик сейчас находится в неизвестном состоянии отказа. Продолжать отправку событий в этой ситуации неразумно - возможно, например, подписчик находится в состоянии, когда каждое событие с этого момента будет вызывать исключение и потребуется некоторое время, чтобы сделать это.

После того, как подписчик выдал исключение, существует только два жизнеспособных варианта действий для наблюдаемого:

  • Перебрось исключение
  • Реализуйте общую обработку, чтобы регистрировать сбой и прекращать отправлять ему события (любого рода), очищать любые ресурсы, связанные с этим подписчиком, и продолжать работу с оставшимися подписками.

Специальная обработка исключений подписчика была бы плохим выбором дизайна; это создаст ненадлежащую поведенческую связь между подписчиком и наблюдаемым. Так что, если вы хотите быть устойчивым к плохим подписчикам, два вышеупомянутых варианта действительно являются разумным пределом ответственности самой наблюдаемой.

Если вы хотите, чтобы ваш подписчик был отказоустойчивым и продолжал работу, то вам следует полностью обернуть его в логику обработки исключений, предназначенную для обработки определенных исключений, из которых вы знаете, как восстанавливаться (и, возможно, для обработки переходных исключений, ведения журнала, логики повторов, обрыва цепи и т. Д..).

Только сам подписчик будет иметь контекст, чтобы понять, подходит ли он для получения дальнейших событий перед лицом отказа.

Если ваша ситуация требует разработки многократно используемой логики обработки ошибок, помните о том, что нужно обернуть обработчики событий наблюдателя, а не наблюдаемые, и позаботиться о том, чтобы не слепо передавать передаваемые события перед лицом отказа. Выпусти это! В то время как не написано о Rx, это интересная классика по разработке программного обеспечения, которая может многое сказать по этому последнему пункту. Если вы не читали это, я очень советую.

Другие вопросы по тегам