RxJS - продолжить отправку уведомлений после возникновения ошибки в onNext

Я новичок в Rx, и я был бы очень признателен за небольшую помощь с обработкой ошибок. У меня есть следующий код, который в основном является typeahead:

var observable = Rx.Observable.fromEvent(searchField, 'keyup')
    .map(ev => ev.target.value)
    .filter(text => text.length > 2)
    .debounce(500 /* ms */)
    .distinctUntilChanged()
    .flatMapLatest(getAsyncSearchResults);

observable.subscribe(function(value) {
        console.log("onNext");
        throw "error"; // THIS TERMINATES THE OBSERVABLE
    },
    function(error) {
        console.log("Error");
    },
    function() {
        console.log("Completed");
    });

Observable завершается после возникновения исключения в функции onNext. Мне кажется, что это нежелательное поведение, так как я не хочу оборачивать весь мой код onNext в try-catch. Есть ли способ, которым я мог бы сказать наблюдаемому продолжать выпускать уведомления, независимо от того, какое исключение происходит?

3 ответа

Вам необходимо переключиться на новый одноразовый поток, и если в нем произойдет ошибка, она будет безопасно удалена и сохранит исходный поток живым.

Я изменил ваш код в простой поток 0 - 5, чтобы легко продемонстрировать. Выдает ошибку / исключение, если число равно 3.

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

Больше информации об этой технике в этой записи блога: The Quest for Meatballs: продолжение потоков RxJS при возникновении ошибок

Вы должны действительно сделать свой next, completed а также error обработчики безопасны для исключений. Для исходного потока нет разумного способа "возобновить" подписку в случае ошибки подписчика - единственный разумный способ - прекратить общение с подписчиком.

Для полного обсуждения того, почему это, смотрите мой ответ. Как обрабатывать исключения, выдаваемые onNext наблюдателя в RxJava? - который также применим к любой платформе Rx.

Более того, см. Почему обратный вызов OnError никогда не вызывается при отбрасывании от данного подписчика? для хорошей пояснительной метафоры (газетного агента) соображений - в частности, необходимо учитывать, что могут быть другие подписчики, которые не бросили ошибку в их next обработчики.

Это поведение Rx на каждой платформе, и единственный способ отловить исключения в наблюдателях - это... ну... перехватить их.

Вы можете создать свой собственный subscribeSafe метод, который оборачивает обратный вызов в try / catch, если у вас часто есть такая необходимость.

В качестве альтернативы, если вы можете переместить детали, которые выбрасывают наблюдатель, в сам поток, вы можете контролировать поток ошибок и (например) повторить попытку.

Пример ниже:

// simulates (hot) keyup event
const obs1 = Rx.Observable.interval(1000).publish()
obs1.connect()

function getAsyncSearchResults(i) {
  return Rx.Observable.timer(500).mapTo(i)
}

obs1.switchMap(getAsyncSearchResults).do(x => {
  console.log(x) // all events gets logged
  throw new Error()
}).retry().subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>

Другие вопросы по тегам