Как охватить MaxDegreeOfParallelism между несколькими блоками TPL DataFlow?
Я хочу ограничить общее количество запросов, которые я отправляю на сервер базы данных по всем блокам DataFlow, до 30. В следующем сценарии регулирование 30 одновременных задач происходит в каждом блоке, поэтому при выполнении всегда выполняется 60 одновременных задач. Очевидно, что я мог бы ограничить свой параллелизм до 15 на блок, чтобы получить в общей сложности 30, но это не было бы оптимальным.
Как мне сделать эту работу? Должен ли я ограничивать (и блокировать) свои ожидания с помощью SemaphoreSlim и т. Д., Или есть внутренний подход DataFlow, который работает лучше?
public class TPLTest
{
private long AsyncCount = 0;
private long MaxAsyncCount = 0;
private long TaskId = 0;
private object MetricsLock = new object();
public async Task Start()
{
ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };
var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));
doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
doProcess.LinkTo(doPrint, linkOption);
int taskCount = 150;
for (int i = 0; i < taskCount; i++)
{
await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
}
doFirstIOWorkAsync.Complete();
await doPrint.Completion;
Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
}
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
lock(MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
await Task.Delay(data.Delay);
lock (MetricsLock)
AsyncCount--;
return data;
}
private Data DoCPUBoundWork(Data data)
{
data.Step = 1;
return data;
}
}
Класс данных:
public class Data
{
public int Delay { get; set; }
public long TaskId { get; set; }
public int Step { get; set; }
}
Отправная точка:
TPLTest tpl = new TPLTest();
await tpl.Start();
3 ответа
Почему бы вам не собрать все в блок действий, который имеет действительное ограничение?
var count = 0;
var ab1 = new TransformBlock<int, string>(l => $"1:{l}");
var ab2 = new TransformBlock<int, string>(l => $"2:{l}");
var doPrint = new ActionBlock<string>(
async s =>
{
var c = Interlocked.Increment(ref count);
Console.WriteLine($"{c}:{s}");
await Task.Delay(5);
Interlocked.Decrement(ref count);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 15 });
ab1.LinkTo(doPrint);
ab2.LinkTo(doPrint);
for (var i = 100; i > 0; i--)
{
if (i % 3 == 0) await ab1.SendAsync(i);
if (i % 5 == 0) await ab2.SendAsync(i);
}
ab1.Complete();
ab2.Complete();
await ab1.Completion;
await ab2.Completion;
Самое простое решение этой проблемы — настроить все ваши блоки с ограниченным параллелизмом :
TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair(
TaskScheduler.Default, maxConcurrencyLevel: 30).ConcurrentScheduler;
ExecutionDataflowBlockOptions execOption = new()
{
TaskScheduler = scheduler,
MaxDegreeOfParallelism = scheduler.MaximumConcurrencyLevel,
};
TaskScheduler
s может только ограничить параллелизм работы, выполняемой в потоках. Они не могут регулировать асинхронные операции, которые не выполняются в потоках . Таким образом, для обеспечения соблюденияMaximumConcurrencyLevel
политики, к сожалению, вы должны передавать синхронные делегаты всем блокам потока данных. Например:
TransformBlock<Data, Data> doFirstIOWorkAsync = new(data =>
{
return DoIOBoundWorkAsync(data).GetAwaiter().GetResult();
}, execOption);
Это изменение повысит спрос напотоков, поэтому вам лучше увеличить количество потоков, которыеThreadPool
мгновенно создает по запросу более высокое значение, чем значение по умолчаниюEnvironment.ProcessorCount
:
ThreadPool.SetMinThreads(100, 100); // At the start of the program
Я предлагаю это решение не потому, что оно оптимально, а потому, что его легко реализовать. Насколько я понимаю, трата некоторого количества ОЗУ на ~30 потоков, которые будут заблокированы большую часть времени, не окажет заметного негативного влияния на тип приложения, с которым вы работаете.
Это решение, к которому я пришел в итоге (если я не могу понять, как использовать один общий блок DataFlow для маршалинга каждого типа доступа к базе данных):
Я определил SemaphoreSlim на уровне класса:
private SemaphoreSlim ThrottleDatabaseQuerySemaphore = new SemaphoreSlim(30, 30);
Я изменил класс ввода / вывода для вызова класса регулирования:
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
Task t = Task.Delay(data.Delay); ;
await ThrottleDatabaseQueryAsync(t);
return data;
}
Класс регулирования: (У меня также есть универсальная версия процедуры регулирования, потому что я не мог понять, как написать одну процедуру для обработки и Задачи, и Задачи
private async Task ThrottleDatabaseQueryAsync(Task task)
{
await ThrottleDatabaseQuerySemaphore.WaitAsync();
try
{
lock (MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
await task;
}
finally
{
ThrottleDatabaseQuerySemaphore.Release();
lock (MetricsLock)
AsyncCount--;
}
}
}