Как обрабатывать файлы каталогов в параллельной библиотеке Task?
У меня есть сценарий, в котором я должен обрабатывать несколько файлов (например, 30) параллельно на основе ядер процессора. Я должен назначить эти файлы для отдельных задач, основанных на отсутствии ядер процессора. Я не знаю, как сделать начальный и конечный предел каждой задачи для обработки. Например, каждая задача знает, сколько файлов она должна обработать.
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
int numCores = System.Environment.ProcessorCount;
int NoOfTasks = FilePaths.Count() > numCores ? (FilePaths.Count()/ numCores) : FilePaths.Count();
for (int i = 0; i < NoOfTasks; i++)
{
Task.Factory.StartNew(
() =>
{
int startIndex = 0, endIndex = 0;
for (int Count = startIndex; Count < endIndex; Count++)
{
this.ProcessFile(FilePaths);
}
});
}
}
catch (Exception ex)
{
throw;
}
}
2 ответа
Для таких проблем, как ваша, есть параллельные структуры данных, доступные в C#. Вы хотите использовать BlockingCollection и хранить в нем все имена файлов.
Ваша идея подсчета количества задач с использованием числа ядер, доступных на машине, не очень хороша. Зачем? Так как ProcessFile()
может не занимать одинаковое время для каждого файла. Итак, было бы лучше начать количество задач, как количество ядер у вас есть. Затем позвольте каждой задаче прочитать имя файла одно за другим из коллекции BlockingCollection и затем обрабатывать файл, пока коллекция BlockingCollection не станет пустой.
try
{
var directoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var filePaths = CreateBlockingCollection(directoryPath);
//Start the same #tasks as the #cores (Assuming that #files > #cores)
int taskCount = System.Environment.ProcessorCount;
for (int i = 0; i < taskCount; i++)
{
Task.Factory.StartNew(
() =>
{
string fileName;
while (!filePaths.IsCompleted)
{
if (!filePaths.TryTake(out fileName)) continue;
this.ProcessFile(fileName);
}
});
}
}
И CreateBlockingCollection()
будет следующим:
private BlockingCollection<string> CreateBlockingCollection(string path)
{
var allFiles = Directory.EnumerateFiles(path);
var filePaths = new BlockingCollection<string>(allFiles.Count);
foreach(var fileName in allFiles)
{
filePaths.Add(fileName);
}
filePaths.CompleteAdding();
return filePaths;
}
Вам придется изменить ваш ProcessFile()
получить имя файла сейчас вместо того, чтобы брать все пути к файлам и обрабатывать его чанк.
Преимущество этого подхода заключается в том, что теперь ваш процессор не будет перегружен или не будет подписан, а нагрузка будет равномерно сбалансирована.
Я сам не запускал код, поэтому в моем коде может быть какая-то синтаксическая ошибка. Не стесняйтесь исправить ошибку, если вы столкнетесь с любой.
Исходя из моего, по общему признанию, ограниченного понимания TPL, я думаю, что ваш код можно переписать так:
private void ProcessFiles(object e)
{
try
{
var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;
var FilePaths = Directory.EnumerateFiles(diectoryPath);
Parallel.ForEach(FilePaths, path => this.ProcessFile(path));
}
catch (Exception ex)
{
throw;
}
}
С уважением