Spark вытягивает данные в RDD или массив данных или набор данных

Я пытаюсь выразить простым языком, когда искра протягивает данные через драйвер, а затем, когда искре не нужно извлекать данные через драйвер.

У меня 3 вопроса -

  1. Давайте в один прекрасный день у вас есть файл размером 20 ТБ, сохраненный в HDFS, и из программы драйвера вы извлекаете его во фрейм данных или в RDD, используя одну из соответствующих библиотек "из коробки" (sc.textfile(path) или же sc.textfile(path).toDF, так далее). Приведет ли это к тому, что программа драйвера будет иметь OOM, если драйвер работает только с 32 ГБ памяти? Или хотя бы есть перестановки на драйвере Джима? Или spark и hadoop будут достаточно умны, чтобы распределять данные из HDFS в искровом исполнителе для создания фрейма данных /RDD без прохождения драйвера?
  2. Точно такой же вопрос, как 1, кроме как из внешней СУБД?
  3. Точно такой же вопрос, как 1, за исключением файловой системы определенных узлов (просто файловая система Unix, файл 20 ТБ, но не HDFS)?

1 ответ

Решение

Относительно 1

Spark работает с распределенной структурой данных, такой как RDD и Dataset (и Dataframe до 2.0). Вот факты, которые вы должны знать об этих структурах данных, чтобы получить ответ на свой вопрос:

  1. Все операции преобразования, такие как (карта, фильтр и т. Д.), Являются ленивыми. Это означает, что чтение не будет выполняться, если вам не требуется конкретный результат ваших операций (например, уменьшить, сложить или сохранить результат в некотором файле).
  2. При обработке файла в HDFS Spark работает с файловыми разделами. Раздел - это минимальный логический пакет данных, который может быть обработан. Обычно один раздел равен одному блоку HDFS, и общее количество разделов никогда не может быть меньше количества блоков в файле. Общий (и используемый по умолчанию) размер блока HDFS составляет 128 МБ.
  3. Все фактические вычисления (включая чтение из HDFS) в RDD и Dataset выполняются внутри исполнителей, а не в драйвере. Драйвер создает DAG и логический план выполнения и назначает исполнителям задачи для дальнейшей обработки.
  4. Каждый исполнитель запускает ранее назначенную задачу для определенного раздела данных. Поэтому обычно, если вы выделяете только одно ядро ​​вашему исполнителю, он обрабатывает не более 128 МБ (размер блока HDFS по умолчанию) данных одновременно.

Так что в основном, когда вы вызываете sc.textFile фактическое чтение не происходит. Все упомянутые факты объясняют, почему ООМ не возникает при обработке даже 20 Тб данных.

Есть несколько особых случаев, таких как join операции. Но даже в этом случае все исполнители сбрасывают свои промежуточные результаты на локальный диск для дальнейшей обработки.

Относительно 2

В случае JDBC вы можете решить, сколько разделов у вас будет для вашей таблицы. И выберите соответствующий ключ раздела в своей таблице, который будет правильно разделять данные на разделы. Вам решать, сколько данных будет загружено в память одновременно.

Относительно 3

Размер блока локального файла контролируется fs.local.block.size свойство (я думаю, 32Mb по умолчанию). Таким образом, это в основном то же самое, что и 1 (файл HDFS), за исключением того факта, что вы будете читать все данные с одного компьютера и одного физического диска (что крайне неэффективно в случае файла размером 20 ТБ).

Другие вопросы по тегам