Как правильно обернуть SqlDataReader с IObservable?
Я хотел бы изучить возможность использования IObservable<T>
как обертка вокруг SqlDataReader
, До сих пор мы использовали ридер, чтобы избежать материализации всего результата в памяти, и мы делали это, используя блокировку синхронного API.
Теперь мы хотим попробовать и использовать асинхронный API в сочетании с.NET Reactive Extensions.
Однако этот код должен будет сосуществовать с синхронным кодом, поскольку принятие асинхронных способов является постепенным процессом.
Мы уже знаем, что это сочетание синхронного и асинхронного не будет работать в ASP.NET - для этого весь путь выполнения запроса должен быть асинхронным во всем. Отличная статья на эту тему http://blog.stephencleary.com/2012/07/dont-block-on-async-code.html
Но я говорю о простом сервисе WCF. Мы уже смешиваем асинхронный и синхронный код там, но мы впервые хотим представить Rx, и возникают проблемы.
Я создал простые модульные тесты (мы используем mstest, sigh:-() для демонстрации проблем. Я надеюсь, что кто-то сможет объяснить мне, что происходит. Пожалуйста, найдите ниже весь исходный код (используя Moq):
using System;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
namespace UnitTests
{
public static class Extensions
{
public static Task<List<T>> ToListAsync<T>(this IObservable<T> observable)
{
var res = new List<T>();
var tcs = new TaskCompletionSource<List<T>>();
observable.Subscribe(res.Add, e => tcs.TrySetException(e), () => tcs.TrySetResult(res));
return tcs.Task;
}
}
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static DbDataReader CreateDataReader(int count = 100, int msWait = 10)
{
var curItemIndex = -1;
var mockDataReader = new Mock<DbDataReader>();
mockDataReader.Setup(o => o.ReadAsync(It.IsAny<CancellationToken>())).Returns<CancellationToken>(ct => Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
if (curItemIndex + 1 < count && !ct.IsCancellationRequested)
{
++curItemIndex;
return true;
}
Trace.WriteLine(curItemIndex);
return false;
}));
mockDataReader.Setup(o => o[0]).Returns<int>(_ => curItemIndex);
mockDataReader.CallBase = true;
mockDataReader.Setup(o => o.Close()).Verifiable();
return mockDataReader.Object;
}
private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToListAsyncResult()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToListAsync().Result;
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableToList()
{
var reader = CreateDataReader();
var numbers = GetObservable(reader).ToEnumerable().ToList();
CollectionAssert.AreEqual(Enumerable.Range(0, 100).ToList(), numbers);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEach()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
}
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachBreak()
{
var reader = CreateDataReader();
int i = 0;
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
break;
}
}
Mock.Get(reader).Verify(o => o.Close());
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void ToEnumerableForEachThrow()
{
var reader = CreateDataReader();
int i = 0;
try
{
foreach (var n in GetObservable(reader).ToEnumerable())
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}
Assert.Fail();
}
catch (Exception exc)
{
Assert.AreEqual("xo-xo", exc.Message);
Mock.Get(reader).Verify(o => o.Close());
}
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, () =>
{
Assert.AreEqual(100, i);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetException(e);
}, () =>
{
Assert.IsTrue(i < 100);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var reader = CreateDataReader();
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable(reader).Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
Mock.Get(reader).Verify(o => o.Close());
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
}
}
Эти модульные тесты фиксируют все возможные варианты использования API, возвращающего IObservable<T>
упаковка для чтения данных:
- Люди могут захотеть полностью реализовать это, используя наши
ToListAsync
метод расширения или.ToEnumerable().ToList()
, - Люди могут захотеть перебрать его, используя
ToEnumerable
метод расширения. True - блокируется, если потребление быстро, и материализует данные во внутренней очереди, если потребление медленно, но, тем не менее, этот сценарий является законным. - Наконец, люди могут использовать наблюдаемое напрямую, подписавшись на него, но в какой-то момент им придется ждать конца (блокируя поток), поскольку большая часть кода по-прежнему синхронна.
Основное требование заключается в том, чтобы считыватель данных был быстро утилизирован после окончания чтения - независимо от способа использования наблюдаемого.
Из всех модульных тестов 4 не пройдены:
SubscribeCancel
а такжеSubscribeThrow
тайм-аут (т.е. тупик)ToEnumerableForEachBreak
а такжеToEnumerableForEachThrow
не удалось проверить утилиту чтения данных.
Ошибка проверки утилизации устройства чтения данных зависит от времени, когда foreach
оставлен (либо через исключение, либо через разрыв) IEnumerator
немедленно удаляется, что в конечном итоге отменяет токен отмены, используемый реализацией наблюдаемого. Однако эта реализация выполняется в другом потоке, и к тому времени, когда она замечает отмену - модульное тестирование уже закончено. В реальном приложении читатель будет правильно и довольно быстро избавлен, но он не синхронизируется с окончанием итерации. Я задаюсь вопросом, возможно ли сделать избавление от вышеупомянутого IEnumerator
экземпляр, чтобы дождаться отмены, замеченной соответствующим IObservable
реализация и читатель утилизируется.
редактировать
Так DbDataReader
является IEnumerable
То есть, если кто-то хочет перечислить объекты синхронно - нет проблем.
Однако что, если я хочу сделать это асинхронно? Мне запрещено перечислять читателя в этом случае - это операция блокировки. Единственный выход - вернуть наблюдаемое. Другие обсуждали эту тему уже и на более хорошем языке, чем я, например, - http://www.interact-sw.co.uk/iangblog/2013/11/29/async-yield-return
Следовательно, я должен вернуть IObservable
и я не могу использовать ToObservable
метод расширения, потому что это зависит от перечисления блокировки читателя.
Далее, учитывая IObservable
кто-то может преобразовать его в IEnumerable
что глупо, учитывая тот факт, что читатель уже IEnumerable
, но выполнимый и законный тем не менее.
Редактировать 2
Отладка кода с помощью.NET Reflector (интегрированного с VS) показывает, что поток проходит через следующий метод:
namespace System.Reactive.Threading.Tasks
{
public static class TaskObservableExtensions
{
...
private static void ToObservableDone<TResult>(Task<TResult> task, AsyncSubject<TResult> subject)
{
switch (task.get_Status())
{
case TaskStatus.RanToCompletion:
subject.OnNext(task.get_Result());
subject.OnCompleted();
return;
case TaskStatus.Canceled:
subject.OnError((Exception) new TaskCanceledException((Task) task));
return;
case TaskStatus.Faulted:
subject.OnError(task.get_Exception().get_InnerException());
return;
}
}
}
}
Как аннулирование токена, так и бросание из OnNext
в асинхронную подписку попадает в этот метод (а также успешное завершение). Как отмена и бросание сходятся к subject.OnError
метод. Этот метод должен в конечном итоге делегировать OnError
обработчик. Но это не так.
Редактировать 3
Следующий Почему обратный вызов OnError никогда не вызывается при броске от данного подписчика? Теперь мне интересно, каким должен быть правильный подход для достижения следующих целей:
- Выставлять объекты, доступные через чтение
SqlDataReader
экземпляр асинхронно - Избегайте материализации объектов. Выбор материализации должен быть в руках вызывающего API.
- API должен быть применим в среде, где асинхронный код смешан с синхронным. Зачем? Поскольку у нас уже есть сервер, использующий синхронный ввод-вывод, и мы хотим постепенно прекратить синхронный блокирующий ввод-вывод с асинхронным.
Имея эти цели передо мной, я придумал что-то вроде этого (см. Код модульного теста):
private static IObservable<int> GetObservable(DbDataReader reader)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
using (reader)
{
while (!cancellationToken.IsCancellationRequested && await reader.ReadAsync(cancellationToken))
{
obs.OnNext((int)reader[0]);
}
}
});
}
Это имеет смысл для вас? Если нет, каковы альтернативы?
Далее я подумал использовать его, как продемонстрировали Subscribe
код модульного теста. Тем не менее, результаты SubcribeCancel
а также SubscribeThrow
показать, что этот шаблон использования является неправильным. Почему обратный вызов OnError никогда не вызывается при отбрасывании от данного подписчика? объясняет почему это неправильно.
Итак, каков правильный путь? Как предотвратить неправильное потребление пользователями API (SubcribeCancel
а также SubscribeThrow
примеры такого некорректного потребления).
1 ответ
SubscribeCancel
SubscribeCancel
терпит неудачу из-за cts
отменить. Это не вызывает OnError
обработчик.
Отмена вашего cts
является синонимом удаления вашей подписки. Уничтожение подписки обуславливает все будущее OnNext
, OnError
, а также OnCompleted
призывы игнорироваться. Поэтому задача никогда не завершается, а тест зависает навсегда.
Решение:
Когда вы отмените cts, установите задачу в правильное состояние.
SubscribeThrow
SubscribeThrow
терпит неудачу из-за исключения в OnNext
обработчик.
Бросать исключение в OnNext
обработчик не передает исключение OnError
обработчик.
Решение:
Не бросайте исключения в свой Subscribe
обработчики. Вместо этого избавьтесь от своей подписки и установите Task
в надлежащем состоянии.
ToEnumerableForEachThrow & ToEnumerableForEachBreak
ToEnumerableForEachThrow
а также ToEnumerableForEachBreak
терпят неудачу из-за состояния гонки.
foreach(...)
на перечислимых вызовет dispose на базовых наблюдаемых, что отменит токен отмены. После этого исключение перехватывается уловом вашего теста (или разрыв просто выходит из foreach), где вы тестируете, чтобы убедиться, что базовый читатель расположен... кроме того, что читатель еще не был удален, потому что наблюдаемое все еще ожидая, пока читатель выдаст следующий результат... только после того, как читатель даст (и наблюдаемые результаты), наблюдаемый цикл вернется и проверит токен отмены. В этот момент наблюдаемый разрывается и выходит из блока использования и избавляется от читателя.
Решение:
Вместо вашего using (...)
заявление, вернуть Disposable
от твоего Observable.Create
, Одноразовые будут утилизированы при утилизации подписки. Это то, что вы хотите. Избавиться от using
Скажите все вместе, и пусть Rx
делай свою работу