Перехват исключений, которые могут быть выброшены из действия подписки OnNext
Я немного новичок в Rx.NET. Можно ли поймать исключение, которое может быть выдано любым из подписчиков? Возьми следующее...
handler.FooStream.Subscribe(
_ => throw new Exception("Bar"),
_ => { });
В настоящее время я ловлю на основе подписки с экземпляром следующего. Реализация которого просто использует ManualResetEvent для пробуждения ожидающего потока.
public interface IExceptionCatcher
{
Action<T> Exec<T>(Action<T> action);
}
и используя это так...
handler.FooStream.Subscribe(
_exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
_ => { });
Я чувствую, что должен быть какой-то лучший способ. Все ли возможности обработки ошибок в Rx.NET специально предназначены для работы с наблюдаемыми ошибками?
РЕДАКТИРОВАТЬ: По запросу моя реализация https://gist.github.com/1409829 (интерфейс и реализация разделены на разные сборки в коде продукта). Обратная связь приветствуется. Это может показаться глупым, но я использую Castle Windsor для управления многими различными абонентами Rx. Эта ловушка исключений зарегистрирована в таком контейнере
windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));
Затем он будет использоваться как это, где observable
это экземпляр IObservable:
var exceptionCatcher =
new ExceptionCatcher(e =>
{
Logger.FatalException(
"Exception caught, shutting down.", e);
// Deal with unmanaged resources here
}, false);
/*
* Normally the code below exists in some class managed by an IoC container.
* 'catcher' would be provided by the container.
*/
observable /* do some filtering, selecting, grouping etc */
.SubscribeWithExceptionCatching(processItems, catcher);
1 ответ
Встроенные операторы Observable не выполняют то, что вы запрашиваете по умолчанию (очень похоже на события), но вы можете создать метод расширения, который бы это делал.
public static IObservable<T> IgnoreObserverExceptions<T, TException>(
this IObservable<T> source
) where TException : Exception
{
return Observable.CreateWithDisposable<T>(
o => source.Subscribe(
v => { try { o.OnNext(v); }
catch (TException) { }
},
ex => o.OnError(ex),
() => o.OnCompleted()
));
}
Тогда любой наблюдаемый может быть обернут этим методом, чтобы получить поведение, которое вы описали.