Перехват исключений, которые могут быть выброшены из действия подписки OnNext

Я немного новичок в Rx.NET. Можно ли поймать исключение, которое может быть выдано любым из подписчиков? Возьми следующее...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

В настоящее время я ловлю на основе подписки с экземпляром следующего. Реализация которого просто использует ManualResetEvent для пробуждения ожидающего потока.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

и используя это так...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

Я чувствую, что должен быть какой-то лучший способ. Все ли возможности обработки ошибок в Rx.NET специально предназначены для работы с наблюдаемыми ошибками?

РЕДАКТИРОВАТЬ: По запросу моя реализация https://gist.github.com/1409829 (интерфейс и реализация разделены на разные сборки в коде продукта). Обратная связь приветствуется. Это может показаться глупым, но я использую Castle Windsor для управления многими различными абонентами Rx. Эта ловушка исключений зарегистрирована в таком контейнере

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

Затем он будет использоваться как это, где observable это экземпляр IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);

1 ответ

Решение

Встроенные операторы Observable не выполняют то, что вы запрашиваете по умолчанию (очень похоже на события), но вы можете создать метод расширения, который бы это делал.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.CreateWithDisposable<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Тогда любой наблюдаемый может быть обернут этим методом, чтобы получить поведение, которое вы описали.

Другие вопросы по тегам