SPARK - назначить несколько ядер одной задаче в RDD.map в pyspark

Я новичок в SPARK и пытаюсь использовать RDD.map в pyspark для параллельного запуска метода с именем function в среде SPARK (всего 72 ядра в автономном кластере SPARK - один драйвер с 100G RAM и 3 рабочих с каждым 24 ядра и 100G оперативной памяти).

Моя цель - запустить функцию 200 раз и усреднить результаты. Выходные данные функции представляют собой массив numpy.ar размером 12 по num_of_samples (который является огромной переменной с точки зрения памяти).

Моей первой попыткой было создать RDD размером 200, затем использовать RDD.map и уменьшить в конце:

sum_data = sc.parallelize(range(0,200)).map(function).reduce(lambda x,y:x+y)

Несмотря на то, что я установил максимальный уровень памяти драйвера драйвера, на уровне редукции памяти не хватает (наверное, из-за огромного вывода функции numpy.array). Я рассчитал, что максимальное количество элементов, которое я могу поместить в мой RDD, чтобы избежать ошибки памяти, составляет около 40 элементов:

sum_data = sc.parallelize(range(0,40)).map(function).reduce(lambda x,y:x+y)

Теперь, когда я пытаюсь это сделать, я вижу, что SPARK создает 40 задач и назначает ровно одно ядро ​​для каждого из них (используя только 40 ядер из 72 доступных ядер в кластере). Таким образом, остальные 32 ядра простаивают и не используются, что приводит к очень медленному времени выполнения. Мне было интересно, если этот подход является правильным и как я могу сделать RDD.map потреблять все доступные ядра вместо использования одного ядра для каждого отображения?

1 ответ

Я думаю, что этого можно достичь, указав количество разделов, которые искры должны разделить ваш RDDс в. Самый простой способ сделать это - добавить дополнительный numSlices параметр в parallelize вызов метода, это гарантирует, что искра разделит ваши данные на numSlices разделы, и я думаю, что это будет использовать целые ядра.

Пожалуйста, обратитесь к официальной документации для получения дополнительной информации.

Другие вопросы по тегам