Ограничивает ли Parallel.ForEach количество активных потоков?
Учитывая этот код:
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
DoSomething(someString);
});
Будут ли все 1000 потоков появляться почти одновременно?
6 ответов
Нет, он не запустит 1000 потоков - да, он ограничит количество используемых потоков. Parallel Extensions использует соответствующее количество ядер, исходя из того, сколько у вас физически и сколько уже занято. Он распределяет работу для каждого ядра, а затем использует технику, называемую похищением работы, чтобы позволить каждому потоку эффективно обрабатывать свою собственную очередь, и ему требуется только дорогой межпоточный доступ, когда это действительно необходимо.
Загляните в блог команды PFX, где вы найдете множество информации о том, как она распределяет работу, и много других тем.
Обратите внимание, что в некоторых случаях вы также можете указать желаемую степень параллелизма.
На одноядерном компьютере... Parallel.ForEach разделы (чанки) коллекции, в которой он работает, между несколькими потоками, но это число рассчитывается на основе алгоритма, который учитывает и, по-видимому, постоянно отслеживает работу, выполняемую потоки, которые он выделяет для ForEach. Таким образом, если часть тела ForEach вызывает долго выполняющиеся функции IO-привязки / блокировки, которые заставили бы поток ждать в ожидании, алгоритм порождает больше потоков и перераспределяет коллекцию между ними. Если потоки завершаются быстро и не блокируются, например, на потоках ввода-вывода, например, при простом вычислении некоторых чисел, алгоритм будет увеличивать (или даже уменьшать) количество потоков до точки, где алгоритм считает оптимальной для пропускной способности (среднее завершение). время каждой итерации).
По сути, пул потоков за всеми различными функциями библиотеки Parallel будет определять оптимальное количество потоков для использования. Количество ядер физического процессора составляет только часть уравнения. Между количеством ядер и количеством порождаемых потоков НЕ существует простого отношения один к одному.
Я не считаю документацию по отмене и обработке синхронизирующих потоков очень полезной. Надеюсь, MS может предоставить лучшие примеры в MSDN.
Не забывайте, что код тела должен быть написан для работы в нескольких потоках, наряду со всеми обычными соображениями безопасности потоков, среда не абстрагирует этот фактор... пока.
Отличный вопрос В вашем примере уровень распараллеливания довольно низок даже на четырехъядерном процессоре, но при некотором ожидании уровень распараллеливания может стать довольно высоким.
// Max concurrency: 5
[Test]
public void Memory_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Теперь посмотрим, что происходит, когда добавляется ожидающая операция для имитации HTTP-запроса.
// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Я еще не внес никаких изменений, и уровень параллелизма / параллелизации резко вырос. Параллельность может быть увеличена с помощью ParallelOptions.MaxDegreeOfParallelism
,
// Max concurrency: 43
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
// Max concurrency: 391
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(100000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Я рекомендую установку ParallelOptions.MaxDegreeOfParallelism
, Это не обязательно увеличит количество используемых потоков, но обеспечит запуск только нормального количества потоков, что, по-видимому, является вашей заботой.
Наконец, чтобы ответить на ваш вопрос, нет, вы не получите все темы для запуска сразу. Используйте Parallel.Invoke, если вы хотите, чтобы параллельный запуск происходил идеально, например, при тестировании условий гонки.
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
ConcurrentBag<string> monitor = new ConcurrentBag<string>();
ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(DateTime.UtcNow.Ticks.ToString());
monitor.TryTake(out string result);
monitorOut.Add(result);
});
var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}
См. Использует ли Parallel.For одну задачу на итерацию? для идеи "ментальной модели" для использования. Однако автор заявляет, что "в конце концов, важно помнить, что детали реализации могут измениться в любое время".
Он вырабатывает оптимальное количество потоков в зависимости от количества процессоров / ядер. Они не все будут появляться одновременно.
Ограничивает ли Parallel.ForEach количество активных потоков?
Если вы не настроитес положительным, ответ - нет. С конфигурацией по умолчанию и при условии, чтоsource
последовательность с достаточно большим размером, будет использовать все потоки, которые сразу доступны в, и будет постоянно просить больше. Сам по себе не накладывает никаких ограничений на количество потоков. Она ограничена только возможностями связанногоTaskScheduler
.
- По умолчанию
MaxDegreeOfParallelism
ParallelOptions.MaxDegreeOfParallelism
является-1
, что означает неограниченный параллелизм. - По умолчанию
ParallelOptions.TaskScheduler
этоThreadPoolTaskScheduler
, более известный какTaskScheduler.Default
.
Поэтому, если вы хотите понять поведение несконфигурированного файла . Который достаточно прост, чтобы его можно было описать в одном абзаце. Немедленно удовлетворяет все запросы на работу, запуская новые потоки, вплоть до мягкого ограничения, которое по умолчанию равноEnvironment.ProcessorCount
. По достижении этого предела дальнейшие запросы ставятся в очередь, а новые потоки создаются со скоростью один новый поток в секунду для удовлетворения спроса¹. Также существует жесткое ограничение на количество потоков, которое на моей машине составляет 32767 потоков. Мягкое ограничение настраивается с помощьюметод. Также удаляются потоки, если их слишком много и нет работы в очереди, примерно с той же скоростью (1 в секунду).
Ниже приведена экспериментальная демонстрация использования всех доступных потоков в . Количество доступных потоков настраивается заранее с помощьюThreadPool.SetMinThreads
, а затем срабатывает и забирает их все:
ThreadPool.SetMinThreads(workerThreads: 100, completionPortThreads: 10);
HashSet<Thread> threads = new();
int concurrency = 0;
int maxConcurrency = 0;
Parallel.ForEach(Enumerable.Range(1, 1500), n =>
{
lock (threads) maxConcurrency = Math.Max(maxConcurrency, ++concurrency);
lock (threads) threads.Add(Thread.CurrentThread);
// Simulate a CPU-bound operation that takes 200 msec
Stopwatch stopwatch = Stopwatch.StartNew();
while (stopwatch.ElapsedMilliseconds < 200) { }
lock (threads) --concurrency;
});
Console.WriteLine($"Number of unique threads: {threads.Count}");
Console.WriteLine($"Maximum concurrency: {maxConcurrency}");
Вывод (после ожидания ~ 5 секунд):
Number of unique threads: 102
Maximum concurrency: 102
КоличествоcompletionPortThreads
не имеет значения для этого теста. Parallel.ForEach
использует потоки, обозначенные как "workerThreads
". 102 потока:
- 100 потоков, которые были созданы сразу по запросу.
- Еще один поток, который был введен после задержки в 1 секунду.
- Основной поток консольного приложения.
¹ Скорость впрыска
ThreadPool
не документируется. Начиная с .NET 7 это один поток в секунду, но это может измениться в будущих версиях .NET.