Есть ли способ использовать Task Parallel Library(TPL) с SQLDataReader?
Мне нравится простота методов расширения Parallel.For и Parallel.ForEach в TPL. Мне было интересно, есть ли способ воспользоваться чем-то похожим или даже немного более продвинутыми Задачами.
Ниже приведено типичное использование SqlDataReader, и мне было интересно, если это возможно, и если да, то как заменить цикл while ниже чем-то в TPL. Поскольку читатель не может предоставить фиксированное количество итераций, метод For extension не возможен, что оставляет дело с Задачами, которые я собираюсь собрать. Я надеялся, что кто-то, возможно, уже занялся этим и решил что-то делать и чего не делать с ADO.net.
using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
conn.Open();
SqlDataReader reader = comm.ExecuteReader();
if (reader.HasRows)
{
while (reader.Read())
{
// Do something with Reader
}
}
}
2 ответа
Ты почти там. Оберните код, который вы опубликовали в функции с этой подписью:
IEnumerable<IDataRecord> MyQuery()
а затем заменить ваш // Do something with Reader
код с этим:
yield return reader;
Теперь у вас есть то, что работает в одном потоке. К сожалению, когда вы читаете результаты запроса, он каждый раз возвращает ссылку на один и тот же объект, и объект просто мутирует сам для каждой итерации. Это означает, что если вы попытаетесь запустить его параллельно, вы получите действительно странные результаты, поскольку параллельное чтение изменяет объект, используемый в разных потоках. Вам нужен код, чтобы взять копию записи для отправки в ваш параллельный цикл.
Однако в этот момент мне нравится пропускать дополнительную копию записи и переходить прямо к строго типизированному классу. Более того, мне нравится использовать универсальный метод, чтобы сделать это:
IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
using (var cn = new SqlConnection("My connection string"))
using (var cmd = new SqlCommand(sql, cn))
{
addParameters(cmd.Parameters);
cn.Open();
using (var rdr = cmd.ExecuteReader())
{
while (rdr.Read())
{
yield return factory(rdr);
}
}
}
}
Предполагая, что ваши фабричные методы создают копию, как и ожидалось, этот код должен быть безопасным для использования в цикле Parallel.ForEach. Вызов метода будет выглядеть примерно так (при условии, что класс Employee имеет статический метод фабрики с именем "Create"):
var UnderPaid = GetData<Employee>(Employee.Create,
"SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary",
p => {
p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
});
Parallel.ForEach(UnderPaid, e => e.GiveRaise());
Важное обновление:
Я не так уверен в этом коде, как когда-то был. Отдельный поток все еще может видоизменить читатель, в то время как другой поток находится в процессе создания его копии. Я мог бы заблокировать это, но я также обеспокоен тем, что другой поток может вызвать обновление читателя после того, как сам оригинал вызвал Read(), но до того, как он начнет делать копию. Поэтому критический раздел здесь состоит из всего цикла while... и на этом этапе вы снова вернулись к однопоточному. Я ожидаю, что есть способ изменить этот код, чтобы он работал так, как ожидается для многопоточных сценариев, но он потребует дополнительного изучения.
Вам будет трудно заменить этот цикл while напрямую. SqlDataReader не является потокобезопасным классом, поэтому его нельзя использовать напрямую из нескольких потоков.
При этом вы могли бы потенциально обработать данные, которые вы прочитали, используя TPL. Здесь есть несколько вариантов. Проще всего сделать свой собственный IEnumerable<T>
реализация, которая работает на читателя, и возвращает класс или структуру, содержащую ваши данные. Затем вы можете использовать PLINQ или Parallel.ForEach
оператор для обработки ваших данных параллельно:
public IEnumerable<MyDataClass> ReadData()
{
using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
conn.Open();
SqlDataReader reader = comm.ExecuteReader();
if (reader.HasRows)
{
while (reader.Read())
{
yield return new MyDataClass(... data from reader ...);
}
}
}
}
Если у вас есть этот метод, вы можете обработать его напрямую через PLINQ или TPL:
Parallel.ForEach(this.ReadData(), data =>
{
// Use the data here...
});
Или же:
this.ReadData().AsParallel().ForAll(data =>
{
// Use the data here...
});