RxJS - продолжить отправку уведомлений после возникновения ошибки в onNext
Я новичок в Rx, и я был бы очень признателен за небольшую помощь с обработкой ошибок. У меня есть следующий код, который в основном является typeahead:
var observable = Rx.Observable.fromEvent(searchField, 'keyup')
.map(ev => ev.target.value)
.filter(text => text.length > 2)
.debounce(500 /* ms */)
.distinctUntilChanged()
.flatMapLatest(getAsyncSearchResults);
observable.subscribe(function(value) {
console.log("onNext");
throw "error"; // THIS TERMINATES THE OBSERVABLE
},
function(error) {
console.log("Error");
},
function() {
console.log("Completed");
});
Observable завершается после возникновения исключения в функции onNext. Мне кажется, что это нежелательное поведение, так как я не хочу оборачивать весь мой код onNext в try-catch. Есть ли способ, которым я мог бы сказать наблюдаемому продолжать выпускать уведомления, независимо от того, какое исключение происходит?
3 ответа
Вам необходимо переключиться на новый одноразовый поток, и если в нем произойдет ошибка, она будет безопасно удалена и сохранит исходный поток живым.
Я изменил ваш код в простой поток 0 - 5, чтобы легко продемонстрировать. Выдает ошибку / исключение, если число равно 3.
Rx.Observable.from([0,1,2,3,4,5])
.switchMap(value => {
// This is the disposable stream!
// Errors can safely occur in here without killing the original stream
return Rx.Observable.of(value)
.map(value => {
if (value === 3) {
throw new Error('Value cannot be 3');
}
return value;
})
.catch(error => {
// You can do some fancy stuff here with errors if you like
// Below we are just returning the error object to the outer stream
return Rx.Observable.of(error);
});
})
.map(value => {
if (value instanceof Error) {
// Maybe do some error handling here
return `Error: ${value.message}`;
}
return value;
})
.subscribe(
(x => console.log('Success', x)),
(x => console.log('Error', x)),
(() => console.log('Complete'))
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>
Больше информации об этой технике в этой записи блога: The Quest for Meatballs: продолжение потоков RxJS при возникновении ошибок
Вы должны действительно сделать свой next
, completed
а также error
обработчики безопасны для исключений. Для исходного потока нет разумного способа "возобновить" подписку в случае ошибки подписчика - единственный разумный способ - прекратить общение с подписчиком.
Для полного обсуждения того, почему это, смотрите мой ответ. Как обрабатывать исключения, выдаваемые onNext наблюдателя в RxJava? - который также применим к любой платформе Rx.
Более того, см. Почему обратный вызов OnError никогда не вызывается при отбрасывании от данного подписчика? для хорошей пояснительной метафоры (газетного агента) соображений - в частности, необходимо учитывать, что могут быть другие подписчики, которые не бросили ошибку в их next
обработчики.
Это поведение Rx на каждой платформе, и единственный способ отловить исключения в наблюдателях - это... ну... перехватить их.
Вы можете создать свой собственный subscribeSafe
метод, который оборачивает обратный вызов в try / catch, если у вас часто есть такая необходимость.
В качестве альтернативы, если вы можете переместить детали, которые выбрасывают наблюдатель, в сам поток, вы можете контролировать поток ошибок и (например) повторить попытку.
Пример ниже:
// simulates (hot) keyup event
const obs1 = Rx.Observable.interval(1000).publish()
obs1.connect()
function getAsyncSearchResults(i) {
return Rx.Observable.timer(500).mapTo(i)
}
obs1.switchMap(getAsyncSearchResults).do(x => {
console.log(x) // all events gets logged
throw new Error()
}).retry().subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>