Как обрабатывать исключения в OnNext при использовании ObserveOn?

Мое приложение закрывается при появлении ошибки OnNext наблюдателем, когда я использую ObserveOn(Scheduler.ThreadPool), Единственный способ, который я нашел, чтобы справиться с этим, это использовать собственный метод расширения ниже (не считая того, что OnNext никогда не вызывает исключение). А затем убедившись, что каждый ObserveOn сопровождается ExceptionToError,

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

Однако это не правильно. Есть ли лучший способ справиться с этим?

пример

Эта программа вылетает из-за необработанного исключения.

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}

4 ответа

Решение

Мы решаем эту проблему в Rx v2.0, начиная с версии RC. Вы можете прочитать все об этом в нашем блоге по адресу http://blogs.msdn.com/rxteam. В основном это сводится к более дисциплинированной обработке ошибок в самом конвейере, в сочетании с методом расширения SubscribeSafe (для перенаправления ошибок во время подписки в канал OnError) и методом расширения Catch в IScheduler (чтобы обернуть планировщик с помощью логики обработки исключений вокруг запланированного действия).

Что касается метода ExceptionToError, предложенного здесь, он имеет один недостаток. Объект подписки IDisposable все еще может быть нулевым, когда выполняются обратные вызовы; есть фундаментальное условие гонки. Чтобы обойти это, вы должны использовать SingleAssignmentDisposable.

Есть разница между ошибками в подписке и ошибками в наблюдаемом. Быстрый тест:

var xs = new Subject<int>();

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
             ex => Console.WriteLine("Error in source: " + ex.Message));

Запустите с этим, и вы получите хорошую обработанную ошибку в источнике:

xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));

Запустите с этим, и вы получите необработанную ошибку в подписке:

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);

Что ваше решение сделало, так это приняло ошибки в подписке и сделало их ошибочными в источнике. И вы сделали это в исходном потоке, а не на основе подписки. Вы можете или не хотели делать это, но это почти наверняка неправильно.

"Правильный" способ сделать это - добавить обработку ошибок, которая вам нужна, непосредственно к подписывающему действию, к которому оно относится. Если вы не хотите изменять свои функции подписки напрямую, вы можете воспользоваться небольшим помощником:

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
    return item =>
    {
        try { action(item); }
        catch (System.Exception e) { catchAction(e); }
    };
}

А теперь использовать его, снова показывая разницу между различными ошибками:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
                                 ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
             ex => Console.WriteLine("Error in source: " + ex.Message));

Теперь мы можем обрабатывать (отдельно) ошибки в источнике и ошибки в подписке. Конечно, любое из этих действий может быть определено в методе, что делает приведенный выше код таким простым (как потенциально):

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);

редактировать

Затем в комментариях мы начали обсуждать тот факт, что ошибки в подписке указывают на ошибки в самом потоке, и вам не нужны другие подписчики в этом потоке. Это совершенно другой тип проблемы. Я был бы склонен написать заметное Validate расширение для обработки этого сценария:

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
    return Observable.Create<T>(o => {
        return source.Subscribe(
            x => {
                if (valid(x)) o.OnNext(x);
                else       o.OnError(new Exception("Could not validate: " + x));
            }, e => o.OnError(e), () => o.OnCompleted()
        );
    });
}

Тогда простой в использовании, без смешивания метафор (ошибки только в исходном коде):

xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
             ex => Console.WriteLine("Error in source: " + ex.Message));

Если вы все еще хотите исключить исключения в Subscribe Вы должны использовать один из других обсуждаемых методов.

Ваше текущее решение не идеально. Как заявил один из людей Rx здесь:

Операторы Rx не перехватывают исключения, возникающие при вызове OnNext, OnError или OnCompleted. Это связано с тем, что мы ожидаем, что (1) разработчик-наблюдатель лучше всех знает, как обрабатывать эти исключения, и мы не можем сделать с ними ничего разумного, и (2) если возникнет исключение, то мы хотим, чтобы это всплыло и не было обработано Rx.,

Ваше текущее решение получает IObservable для обработки ошибок, генерируемых IObserver, что не имеет смысла, так как семантически IObservable не должен знать о том, что его наблюдает. Рассмотрим следующий пример:

var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
  {
      if (x % 5 == 0)
          throw new Exception();
  },
  ex => Console.WriteLine("There's an argument that this should be called"),
  () => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"),
    ex => Console.WriteLine("But definitely not this"),
    () => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();

Здесь нет проблем с источником или ObserverThatWorks, но его OnError будет вызываться из-за несвязанной ошибки с другим Observer. Чтобы исключить исключения в другом потоке от завершения процесса, вам нужно будет перехватить их в этом потоке, поэтому поместите блок try/catch в свои наблюдатели.

Я посмотрел на родной SubscribeSafe метод, который должен решить эту проблему, но я не могу заставить его работать. Этот метод имеет единственную перегрузку, которая принимает IObserver<T>:

// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    IObserver<T> observer);

Я попытался передать наблюдателя, созданного Observer.Create фабричный метод, но исключения в onNext обработчик продолжает сбой процесса¹, как и в обычном Subscribe. В итоге я написал свою версию SubscribeSafe. Он принимает в качестве аргументов три обработчика и направляет любые исключения, созданные onNext и onCompleted обработчики onError обработчик.

/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    // Arguments validation omitted
    var disposable = new SingleAssignmentDisposable();
    disposable.Disposable = source.Subscribe(
        value =>
        {
            try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
        }, onError, () =>
        {
            try { onCompleted(); } catch (Exception ex) { onError(ex); }
        }
    );
    return disposable;
}

Остерегайтесь необработанного исключения в onError обработчик все равно приведет к сбою процесса!

¹ Выбрасываются только исключения, когда обработчик вызывается асинхронно на ThreadPool.

Вы правы - должно быть плохо. Использование и возврат предметов, подобных этому, не очень хороший путь.

По крайней мере, вы должны реализовать этот метод так:

public static IObservable<T> ExceptionToError<T>(this IObservable<T> source)
{
    return Observable.Create<T>(o =>
    {
        var subscription = (IDisposable)null;
        subscription = source.Subscribe(x =>
        {
            try
            {
                o.OnNext(x);
            }
            catch (Exception ex)
            {
                o.OnError(ex);
                subscription.Dispose();
            }
        }, e => o.OnError(e), () => o.OnCompleted());
        return subscription;
    });
}

Обратите внимание, что темы не используются, и что, если я улавливаю ошибку, я удаляю подписку, чтобы предотвратить продолжение последовательности после ошибки.

Однако почему бы просто не добавить OnError обработчик в вашей подписке. Немного так:

var xs = new Subject<int>();

xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x =>
{
    Console.WriteLine(x);
    if (x % 5 == 0)
    {
        throw new System.Exception("Bang!");
    }
}, ex => Console.WriteLine(ex.Message));

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);

Этот код правильно фиксирует ошибку в подписке.

Другой метод заключается в использовании Materialize метод расширения, но это может быть немного излишним, если вышеупомянутые решения не работают.

Другие вопросы по тегам