Многопоточное приложение C# с вызовами базы данных SQL Server
У меня есть база данных SQL Server с 500000 записей в таблице main
, Есть также три других таблицы child1
, child2
, а также child3
, Много ко многим отношениям между child1
, child2
, child3
, а также main
реализуются через три таблицы отношений: main_child1_relationship
, main_child2_relationship
, а также main_child3_relationship
, Мне нужно прочитать записи в main
, Обновить main
, а также вставлять в таблицы отношений новые строки, а также вставлять новые записи в дочерние таблицы. Записи в дочерних таблицах имеют ограничения уникальности, поэтому псевдокод для фактического вычисления (CalculateDetails) будет выглядеть примерно так:
for each record in main
{
find its child1 like qualities
for each one of its child1 qualities
{
find the record in child1 that matches that quality
if found
{
add a record to main_child1_relationship to connect the two records
}
else
{
create a new record in child1 for the quality mentioned
add a record to main_child1_relationship to connect the two records
}
}
...repeat the above for child2
...repeat the above for child3
}
Это прекрасно работает как однопоточное приложение. Но это слишком медленно. Обработка в C# довольно трудоемка и занимает слишком много времени. Я хочу превратить это в многопоточное приложение.
Каков наилучший способ сделать это? Мы используем Linq to Sql.
До сих пор мой подход заключался в создании нового DataContext
объект для каждой партии записей из main
и использовать ThreadPool.QueueUserWorkItem
обработать это. Однако эти пакеты наступают друг на друга, потому что один поток добавляет запись, а затем следующий поток пытается добавить ту же самую, и... Я получаю все виды интересных блокировок SQL Server.
Вот код:
int skip = 0;
List<int> thisBatch;
Queue<List<int>> allBatches = new Queue<List<int>>();
do
{
thisBatch = allIds
.Skip(skip)
.Take(numberOfRecordsToPullFromDBAtATime).ToList();
allBatches.Enqueue(thisBatch);
skip += numberOfRecordsToPullFromDBAtATime;
} while (thisBatch.Count() > 0);
while (allBatches.Count() > 0)
{
RRDataContext rrdc = new RRDataContext();
var currentBatch = allBatches.Dequeue();
lock (locker)
{
runningTasks++;
}
System.Threading.ThreadPool.QueueUserWorkItem(x =>
ProcessBatch(currentBatch, rrdc));
lock (locker)
{
while (runningTasks > MAX_NUMBER_OF_THREADS)
{
Monitor.Wait(locker);
UpdateGUI();
}
}
}
А вот и ProcessBatch:
private static void ProcessBatch(
List<int> currentBatch, RRDataContext rrdc)
{
var topRecords = GetTopRecords(rrdc, currentBatch);
CalculateDetails(rrdc, topRecords);
rrdc.Dispose();
lock (locker)
{
runningTasks--;
Monitor.Pulse(locker);
};
}
А также
private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc,
List<int> thisBatch)
{
List<Record> topRecords;
topRecords = rrdc.Records
.Where(x => thisBatch.Contains(x.Id))
.OrderBy(x => x.OrderByMe).ToList();
return topRecords;
}
CalculateDetails
лучше всего объясняется псевдокодом вверху.
Я думаю, что должен быть лучший способ сделать это. Пожалуйста помоги. Большое спасибо!
7 ответов
Вот мой взгляд на проблему:
При использовании нескольких потоков для вставки / обновления / запроса данных в SQL Server или в любой базе данных взаимоблокировки становятся реальностью. Вы должны предполагать, что они произойдут, и обращаться с ними соответствующим образом.
Это не значит, что мы не должны пытаться ограничивать возникновение тупиков. Тем не менее, легко прочитать об основных причинах взаимоблокировок и принять меры по их предотвращению, но SQL Server всегда вас удивит:-)
Некоторые причины тупиков:
Слишком много потоков - постарайтесь ограничить количество потоков до минимума, но, конечно, нам нужно больше потоков для максимальной производительности.
Недостаточно индексов. Если выборки и обновления недостаточно избирательны, SQL будет снимать блокировки большего диапазона, чем это полезно. Попробуйте указать соответствующие индексы.
Слишком много индексов. Обновление индексов вызывает взаимоблокировки, поэтому постарайтесь сократить индексы до необходимого минимума.
Слишком высокий уровень изоляции транзакций. Уровень изоляции по умолчанию при использовании.NET - "Сериализуемый", тогда как по умолчанию при использовании SQL Server - "Чтение зафиксировано". Снижение уровня изоляции может очень помочь (если уместно, конечно).
Вот как я могу решить вашу проблему:
Я бы не катил свое собственное решение для потоков, я бы использовал библиотеку TaskParallel. Мой основной метод будет выглядеть примерно так:
using (var dc = new TestDataContext()) { // Get all the ids of interest. // I assume you mark successfully updated rows in some way // in the update transaction. List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList(); var problematicIds = new List<ErrorType>(); // Either allow the TaskParallel library to select what it considers // as the optimum degree of parallelism by omitting the // ParallelOptions parameter, or specify what you want. Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8}, id => CalculateDetails(id, problematicIds)); }
Выполните метод CalculateDetails с повторными попытками устранения ошибок взаимоблокировки
private static void CalculateDetails(int id, List<ErrorType> problematicIds) { try { // Handle deadlocks DeadlockRetryHelper.Execute(() => CalculateDetails(id)); } catch (Exception e) { // Too many deadlock retries (or other exception). // Record so we can diagnose problem or retry later problematicIds.Add(new ErrorType(id, e)); } }
Основной метод CalculateDetails
private static void CalculateDetails(int id) { // Creating a new DeviceContext is not expensive. // No need to create outside of this method. using (var dc = new TestDataContext()) { // TODO: adjust IsolationLevel to minimize deadlocks // If you don't need to change the isolation level // then you can remove the TransactionScope altogether using (var scope = new TransactionScope( TransactionScopeOption.Required, new TransactionOptions {IsolationLevel = IsolationLevel.Serializable})) { TestItem item = dc.TestItems.Single(i => i.Id == id); // work done here dc.SubmitChanges(); scope.Complete(); } } }
И, конечно, моя реализация помощника повтора тупика
public static class DeadlockRetryHelper { private const int MaxRetries = 4; private const int SqlDeadlock = 1205; public static void Execute(Action action, int maxRetries = MaxRetries) { if (HasAmbientTransaction()) { // Deadlock blows out containing transaction // so no point retrying if already in tx. action(); } int retries = 0; while (retries < maxRetries) { try { action(); return; } catch (Exception e) { if (IsSqlDeadlock(e)) { retries++; // Delay subsequent retries - not sure if this helps or not Thread.Sleep(100 * retries); } else { throw; } } } action(); } private static bool HasAmbientTransaction() { return Transaction.Current != null; } private static bool IsSqlDeadlock(Exception exception) { if (exception == null) { return false; } var sqlException = exception as SqlException; if (sqlException != null && sqlException.Number == SqlDeadlock) { return true; } if (exception.InnerException != null) { return IsSqlDeadlock(exception.InnerException); } return false; } }
Еще одна возможность - использовать стратегию разделения
Если ваши таблицы могут быть естественным образом разделены на несколько отдельных наборов данных, то вы можете использовать разделенные таблицы и индексы SQL Server или вручную разбить существующие таблицы на несколько наборов таблиц. Я бы порекомендовал использовать разбиение SQL Server, так как второй вариант был бы грязным. Также встроенное разбиение доступно только в SQL Enterprise Edition.
Если для вас возможно разбиение, вы можете выбрать схему разбиения, которая разбивает ваши данные, скажем, на 8 различных наборов. Теперь вы можете использовать свой оригинальный однопоточный код, но иметь 8 потоков, каждый из которых предназначен для отдельного раздела. Теперь не будет никаких (или хотя бы минимального количества) взаимоблокировок.
Я надеюсь, что в этом есть смысл.
обзор
Корень вашей проблемы в том, что L2S DataContext, как и ObjectContext Entity Framework, не является поточно-ориентированным. Как поясняется в этом обмене форума MSDN, поддержка асинхронных операций в решениях.NET ORM все еще ожидается, начиная с.NET 4.0; вам придется развернуть собственное решение, которое, как вы обнаружили, не всегда легко сделать, когда ваша инфраструктура предполагает однопоточность.
Я воспользуюсь этой возможностью, чтобы отметить, что L2S построен поверх ADO.NET, который сам полностью поддерживает асинхронную работу - лично я бы предпочел иметь дело непосредственно с этим нижним уровнем и писать сам SQL, просто чтобы убедиться, что Я полностью понял, что происходит по сети.
Решение SQL Server?
При этом, я должен спросить - это должно быть решение C#? Если вы можете составить свое решение из набора операторов вставки / обновления, вы можете просто отправить SQL напрямую, и ваши проблемы с потоками и производительностью исчезнут.* Мне кажется, что ваши проблемы связаны не с реальными преобразованиями данных, сделано, но сосредоточиться вокруг того, чтобы сделать их производительными из.NET. Если.NET удаляется из уравнения, ваша задача становится проще. В конце концов, лучшим решением часто является то, что вы пишете наименьшее количество кода, верно?;)
Даже если ваша логика обновления / вставки не может быть выражена строго в реляционной манере, в SQL Server есть встроенный механизм для перебора записей и выполнения логики - хотя они справедливо порочны для многих случаев использования, курсоры могут Факт подойдет для вашей задачи.
Если это задача, которая должна повторяться неоднократно, вы могли бы значительно выиграть от ее кодирования как хранимой процедуры.
* конечно, долгосрочный SQL-код имеет свои собственные проблемы, такие как повышение блокировки и использование индекса, с которыми вам придется бороться.
Решение C#
Конечно, может быть так, что об этом в SQL не может быть и речи - может быть, решения вашего кода зависят от данных, поступающих, например, из других мест, или, возможно, ваш проект имеет строгое соглашение "не разрешено использовать SQL". Вы упоминаете некоторые типичные ошибки многопоточности, но, не видя ваш код, я не могу помочь с ними конкретно.
Делать это из C#, очевидно, целесообразно, но вам нужно учитывать тот факт, что для каждого звонка, который вы делаете, будет существовать фиксированная величина задержки. Вы можете уменьшить влияние задержки в сети, используя пулы соединений, активируя несколько активных наборов результатов и используя асинхронные методы Begin/End для выполнения ваших запросов. Даже несмотря на все это, вам все равно придется признать, что доставка данных из SQL Server в ваше приложение обходится дорого.
Один из лучших способов не допустить, чтобы ваш код перешагивал через себя, - избегать как можно большего обмена изменяемыми данными между потоками. Это будет означать, что нельзя использовать один и тот же DataContext в нескольких потоках. Следующим лучшим подходом является блокировка критических участков кода, которые касаются общих данных - lock
блокирует весь доступ к DataContext, от первого чтения до окончательной записи. Такой подход может полностью устранить преимущества многопоточности; Скорее всего, вы можете сделать вашу блокировку более мелкозернистой, но вы должны быть предупреждены, что это путь боли.
Намного лучше, чтобы ваши операции были полностью отделены друг от друга. Если вы можете разделить свою логику между "основными" записями, это идеально, то есть, если нет связей между различными дочерними таблицами и если одна запись в "основной" не имеет последствий для Во-вторых, вы можете разделить свои операции на несколько потоков, как это:
private IList<int> GetMainIds()
{
using (var context = new MyDataContext())
return context.Main.Select(m => m.Id).ToList();
}
private void FixUpSingleRecord(int mainRecordId)
{
using (var localContext = new MyDataContext())
{
var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId);
if (main == null)
return;
foreach (var childOneQuality in main.ChildOneQualities)
{
// If child one is not found, create it
// Create the relationship if needed
}
// Repeat for ChildTwo and ChildThree
localContext.SaveChanges();
}
}
public void FixUpMain()
{
var ids = GetMainIds();
foreach (var id in ids)
{
var localId = id; // Avoid closing over an iteration member
ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) });
}
}
Очевидно, что это такой же игрушечный пример, как и псевдокод в вашем вопросе, но, надеюсь, он заставляет задуматься о том, как распределить задачи так, чтобы между ними не было (или было минимально) общего состояния. Это, я думаю, будет ключом к правильному решению C#.
РЕДАКТИРОВАТЬ Отвечая на обновления и комментарии
Если вы сталкиваетесь с проблемами согласованности данных, я бы посоветовал применять семантику транзакций - вы можете сделать это с помощью System.Transactions.TransactionScope (добавьте ссылку на System.Transactions). С другой стороны, вы могли бы сделать это на уровне ADO.NET, получив доступ к внутреннему соединению и вызвав BeginTransaction
на нем (или как называется метод DataConnection).
Вы также упоминаете тупики. То, что вы боретесь с взаимоблокировками SQL Server, указывает на то, что фактические запросы SQL наступают друг другу на ноги. Не зная, что на самом деле отправляется по телеграфу, сложно сказать подробно, что происходит и как это исправить. Достаточно сказать, что взаимоблокировки SQL возникают в результате SQL-запросов, а не обязательно из потоковых конструкций C# - вам нужно проверить, что именно происходит по проводам. Моя интуиция говорит мне, что если каждая "основная" запись действительно независима от других, тогда не должно быть необходимости в блокировке строк и таблиц, и что Linq to SQL, вероятно, является виновником здесь.
Вы можете получить дамп необработанного SQL, генерируемого L2S в вашем коде, установив DataContext.Log
свойство к чему-то, например, Console.Out. Хотя я никогда не использовал его лично, я понимаю, что LINQPad предлагает средства L2S, и вы можете также получить доступ к SQL там.
SQL Server Management Studio проведет вас до конца - используя Activity Monitor, вы сможете наблюдать за эскалацией блокировки в режиме реального времени. Используя Query Analyzer, вы можете получить представление о том, как SQL Server будет выполнять ваши запросы. С их помощью вы сможете получить четкое представление о том, что делает ваш код на стороне сервера, и, в свою очередь, о том, как его исправить.
Я бы порекомендовал перенести всю обработку XML на сервер SQL. Мало того, что все ваши тупики исчезнут, но вы увидите такое повышение производительности, что вы никогда не захотите возвращаться.
Это будет лучше всего объяснено на примере. В этом примере я предполагаю, что блоб XML уже входит в вашу основную таблицу (я называю это шкафом). Я приму следующую схему:
CREATE TABLE closet (id int PRIMARY KEY, xmldoc ntext)
CREATE TABLE shoe(id int PRIMARY KEY IDENTITY, color nvarchar(20))
CREATE TABLE closet_shoe_relationship (
closet_id int REFERENCES closet(id),
shoe_id int REFERENCES shoe(id)
)
И я ожидаю, что ваши данные (только основная таблица) изначально выглядят так:
INSERT INTO closet(id, xmldoc) VALUES (1, '<ROOT><shoe><color>blue</color></shoe></ROOT>')
INSERT INTO closet(id, xmldoc) VALUES (2, '<ROOT><shoe><color>red</color></shoe></ROOT>')
Тогда вся ваша задача так же проста, как следующее:
INSERT INTO shoe(color) SELECT DISTINCT CAST(CAST(xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) AS color from closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(CAST(closet.xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) = shoe.color
Но, учитывая, что вы будете выполнять много подобной обработки, вы можете упростить свою жизнь, объявив свой основной BLOB-объект типом XML и еще более упростив его:
INSERT INTO shoe(color)
SELECT DISTINCT CAST(xmldoc.query('//shoe/color/text()') AS nvarchar)
FROM closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id)
SELECT closet.id, shoe.id
FROM shoe JOIN closet
ON CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) = shoe.color
Возможны дополнительные оптимизации производительности, например, предварительное вычисление неоднократно вызываемых результатов Xpath во временной или постоянной таблице или преобразование начальной совокупности основной таблицы в BULK INSERT, но я не ожидаю, что они вам действительно понадобятся для успеха,
Блокировки сервера sql являются нормальными и ожидаемыми в этом типе сценария - MS рекомендует, чтобы они обрабатывались на стороне приложения, а не на стороне базы данных.
Однако, если вам нужно убедиться, что хранимая процедура вызывается только один раз, вы можете использовать блокировку мьютекса sql с помощью sp_getapplock. Вот пример того, как это реализовать
BEGIN TRAN
DECLARE @mutex_result int;
EXEC @mutex_result = sp_getapplock @Resource = 'CheckSetFileTransferLock',
@LockMode = 'Exclusive';
IF ( @mutex_result < 0)
BEGIN
ROLLBACK TRAN
END
-- do some stuff
EXEC @mutex_result = sp_releaseapplock @Resource = 'CheckSetFileTransferLock'
COMMIT TRAN
Эта проблема может быть решена с помощью LimitedConcurrencyLevelTaskScheduler
public class InOutMessagesController
{
private static LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
private TaskFactory taskFactory = new TaskFactory(scheduler);
private TaskFactory<MyTask<Object[]>> taskFactoryWithResult = new TaskFactory<MyTask<Object[]>>(scheduler);
private ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
private ConcurrentBag<MyTask<Object[]>> tasksWithResult = new ConcurrentBag<MyTask<Object[]>>();
private ConcurrentBag<int> endedTaskIds = new ConcurrentBag<int>();
private ConcurrentBag<int> endedTaskWithResultIds = new ConcurrentBag<int>();
private Task TaskForgetEndedTasks;
private static object taskForgetLocker = new object();
#region Conveyor
private async void AddTaskVoidToQueue(Task task)
{
try
{
tasks.Add(task);
await taskFactory.StartNew(() => task.Start());
if (TaskForgetEndedTasks == null)
{
ForgetTasks();
}
}
catch (Exception ex)
{
NLogger.Error(ex);
}
}
private async Task<Object[]> AddTaskWithResultToQueue(MyTask<Object[]> task)
{
ForgetTasks();
tasksWithResult.Add(task);
return await taskFactoryWithResult.StartNew(() => { task.Start(); return task; }).Result;
}
private Object[] GetEqualTaskWithResult(string methodName)
{
var equalTask = tasksWithResult.FirstOrDefault(x => x.MethodName == methodName);
if (equalTask == null)
{
return null;
}
else
{
return equalTask.Result;
}
}
private void ForgetTasks()
{
Task.WaitAll(tasks.Where(x => x.Status == TaskStatus.Running || x.Status == TaskStatus.Created || x.Status == TaskStatus.WaitingToRun).ToArray());
lock (taskForgetLocker)
{
if (TaskForgetEndedTasks == null)
{
TaskForgetEndedTasks = new Task(ForgetEndedTasks);
TaskForgetEndedTasks.Start();
}
TaskForgetEndedTasks.Wait();
TaskForgetEndedTasks = null;
}
}
private void ForgetEndedTasks()
{
try
{
var completedTasks = tasks.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled);
var completedTasksWithResult = tasksWithResult.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled);
if (completedTasks.Count() > 0)
{
foreach (var ts in completedTasks)
{
if (ts.Exception != null)
{
NLogger.Error(ts.Exception);
if (ts.Exception.InnerException != null)
{
NLogger.Error(ts.Exception.InnerException);
}
}
endedTaskIds.Add(ts.Id);
}
if (endedTaskIds.Count != 0)
{
foreach (var t in endedTaskIds)
{
Task ct = completedTasks.FirstOrDefault(x => x.Id == t);
tasks.TryTake(out ct);
}
}
endedTaskIds = new ConcurrentBag<int>();
}
if (completedTasksWithResult.Count() > 0)
{
foreach (var ts in completedTasksWithResult)
{
if (ts.Exception != null)
{
NLogger.Error(ts.Exception);
if (ts.Exception.InnerException != null)
{
NLogger.Error(ts.Exception.InnerException);
}
}
endedTaskWithResultIds.Add(ts.Id);
}
foreach (var t in endedTaskWithResultIds)
{
var ct = tasksWithResult.FirstOrDefault(x => x.Id == t);
tasksWithResult.TryTake(out ct);
}
endedTaskWithResultIds = new ConcurrentBag<int>();
}
}
catch(Exception ex)
{
NLogger.Error(ex);
}
}
#endregion Conveyor
internal void UpdateProduct(List<ProductData> products)
{
var updateProductDataTask = new Task(() => ADOWorker.UpdateProductData(products));
AddTaskVoidToQueue(updateProductDataTask);
}
internal async Task<IEnumerable<ProductData>> GetProduct()
{
string methodName = "GetProductData";
Product_Data[] result = GetEqualTaskWithResult(methodName) as Product_Data[];
if (result == null)
{
var task = new MyTask<Object[]>(ADOWorker.GetProductData, methodName);
result = await AddTaskWithResultToQueue(task) as Product_Data[];
}
return result;
}
}
public class ADOWorker
{
public Object[] GetProductData()
{
entities = new DataContext();
return entities.Product_Data.ToArray();
}
public void UpdateProductData(List<Product_Data> products)
{
entities = new DataContext();
foreach (Product_Data pr_data in products)
{
entities.sp_Product_Data_Upd(pr_data);
}
}
}
Это может быть очевидным, но циклический просмотр каждого кортежа и выполнение вашей работы в контейнере сервлета требует больших затрат на каждую запись.
Если возможно, перенесите часть или всю эту обработку на сервер SQL, переписав свою логику как одну или несколько хранимых процедур.
Если
- У вас нет много времени, чтобы потратить на эту проблему и вам нужно исправить это прямо сейчас
- Вы уверены, что ваш код сделан так, что другой поток НЕ будет изменять одну и ту же запись
- Ты не боишься
Тогда... вы можете просто добавить "БЕЗ БЛОКИРОВКИ" в ваши запросы, чтобы MSSQL не применял блокировки.
Использовать с осторожностью:)
Но, в любом случае, вы не сказали нам, где теряется время (в однопоточной версии). Потому что, если это в коде, я посоветую вам написать все в БД напрямую, чтобы избежать непрерывного обмена данными. Если это в БД, я посоветую проверить индекс (слишком много?), Ввод / вывод, процессор и т. Д.