Parallel.ForEach может вызвать исключение "Недостаточно памяти" при работе с перечислимым с большим объектом

Я пытаюсь перенести базу данных, в которой изображения хранятся в базе данных, в запись в базе данных, указывающую на файл на жестком диске. Я пытался использовать Parallel.ForEach чтобы ускорить процесс, используя этот метод для запроса данных.

Тем не менее, я заметил, что я получаю OutOfMemory Исключение. я знаю Parallel.ForEach будет запрашивать партию перечислимых элементов, чтобы уменьшить затраты на накладные расходы, если есть такая для интервалов между запросами (так что ваш источник с большей вероятностью будет кэшировать следующую запись в памяти, если вы сделаете несколько запросов сразу, а не разнесете их), Проблема связана с тем, что одна из записей, которые я возвращаю, - это байтовый массив размером 1-4 Мб, из-за которого из-за кэширования используется все адресное пространство (программа должна работать в режиме x86, поскольку целевая платформа будет 32-разрядной). машина)

Есть ли способ отключить кэширование или сделать меньше для TPL?


Вот пример программы, чтобы показать проблему. Это должно быть скомпилировано в режиме x86, чтобы показать проблему, если она занимает много времени или не происходит на вашей машине, увеличить размер массива (я нашел 1 << 20 занимает около 30 секунд на моей машине и 4 << 20 было почти мгновенно)

class Program
{

    static void Main(string[] args)
    {
        Parallel.ForEach(CreateData(), (data) =>
            {
                data[0] = 1;
            });
    }

    static IEnumerable<byte[]> CreateData()
    {
        while (true)
        {
            yield return new byte[1 << 20]; //1Mb array
        }
    }
}

3 ответа

Решение

Параметры по умолчанию для Parallel.ForEach хорошо работают только тогда, когда задача связана с ЦП и масштабируется линейно. Когда задача связана с процессором, все работает отлично. Если у вас есть четырехъядерный процессор и другие процессы не запущены, то Parallel.ForEach использует все четыре процессора. Если у вас есть четырехъядерный процессор и какой-то другой процесс на вашем компьютере использует один полный процессор, то Parallel.ForEach использует примерно три процессора.

Но если задача не связана с процессором, то Parallel.ForEach продолжает запускать задачи, стараясь изо всех сил держать все процессоры занятыми. Тем не менее, независимо от того, сколько задач выполняется параллельно, всегда остается больше неиспользуемой мощности процессора, и поэтому он продолжает создавать задачи.

Как вы можете определить, связана ли ваша задача с процессором? Надеюсь, просто проверив это. Если вы учитываете простые числа, это очевидно. Но другие случаи не так очевидны. Эмпирическим способом определить, связана ли ваша задача с ЦП, является ограничение максимальной степени параллелизма с помощью ParallelOptions.MaximumDegreeOfParallelism и понаблюдайте, как ведет себя ваша программа. Если ваша задача связана с процессором, то вы должны увидеть такой шаблон в четырехъядерной системе:

  • ParallelOptions.MaximumDegreeOfParallelism = 1: используйте один полный процессор или 25% загрузки процессора
  • ParallelOptions.MaximumDegreeOfParallelism = 2: используйте два процессора или 50% загрузки процессора
  • ParallelOptions.MaximumDegreeOfParallelism = 4: использовать все процессоры или использовать процессор на 100%

Если это ведет себя так, то вы можете использовать по умолчанию Parallel.ForEach варианты и получить хорошие результаты. Линейное использование процессора означает хорошее планирование задач.

Но если я запускаю ваше приложение на моем Intel i7, я получаю около 20% загрузки ЦП независимо от того, какую максимальную степень параллелизма я установил. Почему это? Так много памяти выделяется, что сборщик мусора блокирует потоки. Приложение связано с ресурсами, а ресурс является памятью.

Аналогично, задача, связанная с вводом-выводом, которая выполняет длительные запросы к серверу базы данных, также никогда не сможет эффективно использовать все ресурсы ЦП, доступные на локальном компьютере. И в таких случаях планировщик задач не может "знать, когда нужно остановить" запуск новых задач.

Если ваша задача не связана с ЦП или загрузка ЦП не масштабируется линейно с максимальной степенью параллелизма, то вам следует посоветовать Parallel.ForEach не запускать слишком много задач одновременно. Самый простой способ - указать число, которое допускает некоторый параллелизм для перекрывающихся задач, связанных с вводом / выводом, но не настолько, чтобы вы подавляли потребность локального компьютера в ресурсах или перегружали любые удаленные серверы. Для получения наилучших результатов необходимо использовать метод проб и ошибок:

static void Main(string[] args)
{
    Parallel.ForEach(CreateData(),
        new ParallelOptions { MaxDegreeOfParallelism = 4 },
        (data) =>
            {
                data[0] = 1;
            });
}

Итак, хотя то, что предложил Рик, определенно является важным моментом, я думаю, что еще одна вещь, которую не хватает, - это обсуждение разбиения.

Parallel::ForEach будет использовать по умолчанию Partitioner<T>реализация которого для IEnumerable<T> которая не имеет известной длины, будет использовать стратегию разбиения на фрагменты. Что это означает каждый рабочий поток, который Parallel::ForEach собирается использовать для работы над набором данных будет читать некоторое количество элементов из IEnumerable<T> который затем будет обрабатываться только этим потоком (без учета кражи работы). Это позволяет сэкономить на постоянном возвращении к источнику, выделении новой работы и планировании ее для другого рабочего потока. Так что, как правило, это хорошо. Однако, в вашем конкретном сценарии представьте, что вы работаете на четырехъядерном процессоре, и вы установили MaxDegreeOfParallelism на 4 темы для вашей работы, и теперь каждый из них тянет кусок 100 элементов из вашего IEnumerable<T>, Ну, это 100-400 мегабайт прямо здесь только для этого конкретного рабочего потока, верно?

Итак, как вы решаете это? Легко, ты пишешь кастом Partitioner<T> реализация. Теперь чанкинг по-прежнему полезен в вашем случае, поэтому вы, вероятно, не захотите использовать стратегию разделения по одному элементу, потому что тогда вы добавите накладные расходы со всей необходимой для этого координацией задач. Вместо этого я бы написал настраиваемую версию, которую вы можете настроить через набор настроек, пока не найдете оптимальный баланс для вашей рабочей нагрузки. Хорошая новость заключается в том, что, хотя написание такой реализации довольно просто, вам даже не нужно писать ее самостоятельно, потому что команда PFX уже сделала это и включила ее в проект примеров параллельного программирования.

Эта проблема имеет отношение к разделителям, а не к степени параллелизма. Решение состоит в том, чтобы реализовать пользовательский разделитель данных.

Если набор данных большой, кажется, что моно реализация TPL гарантированно исчерпает память. Это случилось со мной недавно (по сути, я выполнял вышеуказанный цикл и обнаружил, что память линейно увеличивалась, пока не выдало исключение OOM).

После отслеживания проблемы я обнаружил, что по умолчанию mono разделит перечислитель с помощью класса EnumerablePartitioner. Этот класс имеет поведение, заключающееся в том, что каждый раз, когда он передает данные задаче, он "группирует" данные с постоянно увеличивающимся (и неизменяемым) фактором, равным 2. Таким образом, в первый раз, когда задача запрашивает данные, она получает кусок размера 1, в следующий раз размером 2*1=2, в следующий раз 2*2=4, затем 2*4=8 и т. Д. И т. Д. В результате объем данных, переданных задаче, и, следовательно, сохраненных в объем памяти одновременно увеличивается с увеличением длины задачи, и если обрабатывается много данных, неизбежно возникает исключение нехватки памяти.

Предположительно, первоначальная причина такого поведения состоит в том, что он хочет избежать того, чтобы каждый поток возвращался несколько раз для получения данных, но, похоже, он основан на предположении, что все обрабатываемые данные могут помещаться в память (не в случае чтения из большие файлы).

Эту проблему можно избежать с помощью специального разделителя, как указано ранее. Вот один общий пример того, который просто возвращает данные каждой задаче по одному элементу за раз:

https://gist.github.com/evolvedmicrobe/7997971

Просто сначала создайте экземпляр этого класса и передайте его Parallel.For вместо самого перечисляемого

Хотя использование специального разделителя, несомненно, является наиболее "правильным" ответом, более простое решение - позволить сборщику мусора наверстать упущенное. В том случае, когда я пытался, я делал многократные вызовы цикла parallel.for внутри функции. Несмотря на выход из функции каждый раз, объем памяти, используемый программой, продолжал линейно увеличиваться, как описано здесь. Я добавил:

//Force garbage collection.
GC.Collect();
// Wait for all finalizers to complete before continuing.
GC.WaitForPendingFinalizers();

и хотя он не очень быстрый, он решил проблему с памятью. Предположительно, при высокой загрузке процессора и памяти сборщик мусора работает неэффективно.

Другие вопросы по тегам