Почему обратный вызов OnError никогда не вызывается при отбрасывании от данного подписчика?
Пожалуйста, соблюдайте следующий юнит-тест:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, e => Assert.Fail(), () =>
{
Assert.AreEqual(100, i);
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, () =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
tcs.TrySetResult(null);
}, Assert.Fail);
tcs.Task.Wait();
}
}
}
Юнит тесты SubscribeCancel
а также SubscribeThrow
тайм-аут, потому что OnError
обратный вызов никогда не вызывается, и поэтому ожидание задачи никогда не заканчивается.
Что случилось?
PS
Этот вопрос связан с тем, как правильно обернуть SqlDataReader с IObservable?
РЕДАКТИРОВАТЬ
Тем временем я создал новый выпуск Rx - https://rx.codeplex.com/workitem/74
EDIT2
Следующая реализация наблюдателя дает точно такой же результат, даже если она соответствует параграфу 6.5 Руководства по проектированию Rx - "Реализация подписки не должна генерировать":
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
try
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
obs.OnCompleted();
}
catch (Exception exc)
{
obs.OnError(exc);
}
});
}
EDIT3
Я начинаю полагать, что предполагается писать код, подобный этому, когда асинхронная наблюдаемая последовательность интегрируется в синхронный код, который в противном случае (как правило, имеет место на стороне сервера в том или ином месте):
var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
try
{
...
}
catch (Exception e)
{
DoErrorLogic();
tcs.TrySetException(e);
}
}, e =>
{
DoErrorLogic();
tcs.TrySetException(e);
}, () =>
{
DoCompletedLogic();
tcs.TrySetResult(null);
});
tcs.Task.Wait();
Это действительно так?
РЕДАКТИРОВАТЬ 4
Я думаю, что наконец-то начинает течь мой ржавый мозг, что ты пытаешься сказать. Сейчас я переключусь на другой пост - Как правильно обернуть SqlDataReader с помощью IObservable?
2 ответа
Такое поведение разработано. Если подписчик выдает исключение (что, кстати, является плохой практикой), среда Rx правильно считает, что он мертв, и не связывается с ним дальше. Если подписка отменяется, это также не ошибка - просто запрос на отправку дальнейших событий любого рода - что Rx соблюдает.
Редактировать в ответ на комментарии
Я не думаю, что есть легкая ссылка, на которую можно указать в документации - поведение, которое вы видите, настолько внутреннее, что оно неявное. Самое близкое, что я могу получить, это указать вам на исходный код AnonymousSafeObserver и AutoDetatchObserver. У последнего есть объяснительный сценарий, который может помочь, но он немного сложен.
Возможно, поможет аналогия. Представьте, что события потока данных - это газеты, доставляемые газетным агентом. и подписчики являются домашними хозяйствами.
Абонент выбрасывает исключение
Газетный агент с радостью доставляет газеты до тех пор, пока однажды один из подписчиков - мистер Джонс - не включит газ, и его дом взорвется, убив мистера Джонса и разрушив дом (исключение - необработанное исключение). Газетный агент понимает, что он больше не может доставлять газеты мистеру Джонсу, и при этом он не может отправлять уведомление о прекращении, и нет проблем с подачей газеты (поэтому OnError или OnCompleted не подходит), и у газетного агента остается один подписчик.
Сравните это с газетными типографами, которые непреднамеренно используют легковоспламеняющиеся чернила и разжигают фабрику. Теперь плохой газетный агент должен действительно отправить пояснительную записку (OnError) всем своим подписчикам, что подача прекращена на неопределенный срок.
Подписчик отменяет подписку
Мистер Джонс получает газеты по подписке, пока однажды не решит, что ему надоел бесконечный поток депрессивных историй, и просит отменить подписку. Газетный агент обязывает. Он не отправляет г-ну Джонсу записку, объясняющую, что газета прекратила печатать издания (без OnCompleted), - нет. Он также не посылает мистеру Джонсу записку, объясняющую, что газета прекратила свою деятельность (без OnError) - он просто прекращает выпускать газеты, как просил мистер Джонс.
Ответ на Edit3
Я сочувствую вашей борьбе. Я отмечаю, что в вашем коде вы пытались связать идиому TPL (Task) с Rx. Такие попытки часто кажутся неуклюжими, потому что это действительно совсем другие миры. Довольно сложно комментировать такой абзац:
Я начинаю полагать, что предполагается писать код, подобный этому, когда асинхронная наблюдаемая последовательность интегрируется в синхронный код, который в противном случае (как правило, имеет место на стороне сервера в том или ином месте):
Твердо соглашаясь с хорошо сделанным утверждением Брэндона, я не могу вспомнить случаев, когда действительно целесообразно интегрировать асинхронный код с синхронным кодом на стороне сервера так, как вы пытаетесь. Это похоже на дизайнерский запах для меня. Идиоматически, можно было бы попытаться сохранить код реагирующим - сделать подписку и позволить подписчику работать с реактивом. Я не могу вспомнить, чтобы мне приходилось сталкиваться с необходимостью перехода в синхронный код, как ты описываешь.
Конечно, глядя на код, который вы написали в Edit3, не ясно, чего вы пытаетесь достичь. Источник не обязан реагировать на ошибки подписчика. Это хвост, виляющий собакой. Обработчики исключений, которые должны присутствовать для обеспечения непрерывности обслуживания подписчика, должны быть в коде обработки подписки, а не в наблюдаемом источнике - он должен заниматься только защитой от поведения мошенника-наблюдателя. Такая логика реализована в связанном выше AnonymousSafeObserver и используется большинством операторов Rx. Наблюдаемое может очень хорошо иметь логику для обработки непрерывности своих исходных данных - но это другая проблема, а не та, которую вы решаете в своем коде.
Везде, где вы пытаетесь соединиться с синхронным кодом через вызовы ToTask
или же Wait
Вероятно, есть причина тщательно рассмотреть ваш дизайн.
Я чувствую, что предоставление более конкретной постановки проблемы - возможно, взятой из сценария реального мира, который вы пытаетесь решить - послужит для вас более полезным советом. Пример 'SqlDataReader`, где вы говорите...
Наконец, люди могут использовать observable [обертывание SqlDataReader] напрямую, подписавшись на него, но в какой-то момент им придется ждать конца (блокируя поток), поскольку большая часть кода по-прежнему синхронна.
... подчеркивает дизайн болота, в котором вы находитесь. В этом случае, если вы сделаете вывод, что таким потребителям было бы явно лучше использовать IEnumerable<T>
интерфейс - или, возможно, просят IObservable<List<T>>
, Но ключ в том, чтобы взглянуть на более широкую картину, тот факт, что вы пытаетесь обернуть SqlDataReader в наблюдаемую оболочку, является запахом дизайна - потому что это набор фиксированных данных в ответ на конкретный одноразовый запрос. Возможно, это асинхронный сценарий, но на самом деле не реактивный. Сравните с более типичным реактивным сценарием, таким как "отправляйте мне цены на акции X всякий раз, когда вы их получаете", когда вы настраиваете будущий поток данных исключительно по указанию источника, чтобы подписчики могли потом реагировать.
Это явно не указано в руководящих принципах, но подразумевается грамматикой Rx и целью IObservables
, IObservables передают информацию из источника одному или нескольким наблюдателям. Информация, которая передается, является данными (через OnNext
), за которым может следовать OnCompleted
или OnError
, Важно помнить, что эти обратные вызовы запускаются источником. Они не могут и не должны быть вызваны наблюдателем.
Если OnError
называется, это будет потому, что что-то в исходной наблюдаемой цепочке не удалось. Это никогда не будет, потому что наблюдатель потерпел неудачу.
В вашем SubscribeThrow
Например, наблюдатель (построенный из ваших 3-х предоставленных лямбд для OnNext
, OnError
, OnCompleted
) терпит неудачу. Такие ошибки в наблюдателях не могут и не должны приводить к сбою источника, наблюдаемого самим собой.
RX 2.0 ввел гарантии для обеспечения этого контракта. Прочитайте раздел "Обновленная стратегия обработки ошибок" в блоге RX 2.0.
Смежный вопрос: как обрабатывать исключения в OnNext при использовании ObserveOn?
Edit3
Это, безусловно, один из способов сделать это, но это довольно уродливо. Во-первых, я оспариваю ваше утверждение о том, что асинхронный серверный код обычно должен быть синхронным для взаимодействия с некоторым синхронным кодом. Я считаю, что это верно только для юнит-тестов.
Но в любом случае, я считаю, что вы просто подписываетесь слишком рано. Мой опыт работы с Rx заключается в том, что всякий раз, когда я сталкиваюсь с трением, это происходит потому, что я подписываюсь слишком рано и вместо этого должен расширять наблюдаемую цепочку монад.
В вашем примере, вместо того, чтобы подписываться на поток данных и обрабатывать их в своем наблюдателе, думайте о логике вашего процессора как о еще одной проекции поступающих данных. Ваша логика в этом случае просто превращает часть данных в результат работы. Это позволяет вам рассматривать успех или неудачу вашей логики как часть потока, который затем можно наблюдать так, как вы хотите. Вы в конечном итоге с этим:
var data = GetObservable();
var results = data.Select(item =>
{
DoWork(item);
// since your work does not produce anything...
// it either succeeds or throws an exception
// and you cannot make an Observable<void>
// return Unit.Default. Unit is the Rx equivalent of
// void.
return Unit.Default;
});
// subscribe to the stream and wait synchronously for it to finish
results.Wait(); // this will throw an exception the first time DoWork fails
// or asynchronously await the stream to finish...just like a Task
await results;
// or turn the stream into a Task that completes when the processing is complete.
var task = results.ToTask();
Или, что если вы не хотите останавливать обработку первой ошибки, а просто накапливаете ошибки. Теперь это легко, если вы думаете о своей работе как о проекции...
var results = data.Select(item =>
{
try
{
DoWork(item);
return null; // no error
}
catch (Exception e)
{
return e;
}
}).Where(e => e != null).ToList();
var errorList = results.Wait();
// or var errorList = await results;
// or Task<List<Exception>> errorListTask = results.ToTask();
оба эти примера кажутся намного проще и понятнее и возможны, если просто подумать о проблеме по-другому.