Как раздел Spark (ING) работает с файлами в HDFS?

Я работаю с Apache Spark на кластере с использованием HDFS. Насколько я понимаю, HDFS распределяет файлы по дата-узлам. Поэтому, если в файловую систему положить файл "file.txt", он будет разбит на разделы. Теперь я звоню

rdd = SparkContext().textFile("hdfs://.../file.txt") 

от Apache Spark. Имеет ли rdd теперь те же разделы, что и "file.txt" в файловой системе? Что происходит, когда я звоню

rdd.repartition(x)

где x > тогда разделы, используемые hdfs? Будет ли Spark физически переставлять данные в формате hdf для локальной работы?

Пример: я поместил 30 ГБ текстовый файл в систему HDFS, которая распределяет его по 10 узлам. Будет ли Spark а) использовать те же 10 разделов? и б) перемешать 30 ГБ через кластер, когда я вызываю перераспределение (1000)?

3 ответа

Решение

Когда Spark считывает файл из HDFS, он создает один раздел для одного входного разбиения. Разделение входов задается Hadoop InputFormat раньше читал этот файл. Например, если вы используете textFile() это было бы TextInputFormat в Hadoop, который вернет вам один раздел для одного блока HDFS (но разделение между разделами будет выполнено с разделением строк, а не с точным разделением блоков), если у вас нет сжатого текстового файла. В случае сжатого файла вы получите один раздел для одного файла (так как сжатые текстовые файлы не разделяются).

Когда вы звоните rdd.repartition(x) он будет выполнять перемешивание данных из N разделы у вас есть в rdd в x разделы, которые вы хотите иметь, разделение будет сделано на основе циклического перебора.

Если у вас есть несжатый текстовый файл объемом 30 ГБ, хранящийся в HDFS, то при настройке размера блока HDFS по умолчанию (128 МБ) он будет храниться в 235 блоках, что означает, что СДР, считанный из этого файла, будет иметь 235 разделов. Когда вы звоните repartition(1000) ваш СДР будет помечен как перераспределенный, но на самом деле он будет перетасовываться до 1000 разделов только тогда, когда вы выполните действие поверх этого СДР (концепция отложенного выполнения)

При чтении файлов HDFS без буфера (например, паркета) с помощью spark-sql, количество разделов DataFrame df.rdd.getNumPartitions зависит от этих факторов:

  • spark.default.parallelism (примерно переводит в #cores, доступные для приложения)
  • spark.sql.files.maxPartitionBytes (по умолчанию 128 МБ)
  • spark.sql.files.openCostInBytes (по умолчанию 4 МБ)

Грубая оценка количества разделов:

  • Если у вас достаточно ядер для чтения всех ваших данных параллельно (то есть как минимум одно ядро ​​на каждые 128 МБ ваших данных)

    AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

  • Если вам не хватает ядер,

    AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize

Точные вычисления немного сложны и могут быть найдены в базе кода для FileSourceScanExec, см. Здесь.

Вот снимок "Как блоки в HDFS загружаются в работников Spark как разделы"

На этих изображениях 4 блока HDFS загружены как разделы Spark внутри памяти 3 рабочих

Набор данных в HDFS разбит на разделы


Пример: я поместил 30 ГБ текстовый файл в систему HDFS, которая распределяет его по 10 узлам.

Уилл Спарк

а) использовать те же 10 разделов?

Spark загружает те же 10 HDFS-блоков в рабочую память как разделы. Я предполагаю, что размер блока 30 ГБ должен составлять 3 ГБ, чтобы получить 10 разделов / блоков (с настройкой по умолчанию)

б) перемешать 30 ГБ через кластер, когда я вызываю перераспределение (1000)?

Да, Spark перемешивает данные между рабочими узлами, чтобы создать 1000 разделов в рабочей памяти.

Замечания:

HDFS Block -> Spark partition   : One block can represent as One partition (by default)
Spark partition -> Workers      : Many/One partitions can present in One workers 

Дополнение к @0x0FFF. Если он берется из HDFS в качестве входного файла, он будет рассчитываться примерно так rdd = SparkContext().textFile("hdfs://.../file.txt") и когда вы делаете rdd.getNumPatitions это приведет Max(2, Number of HDFS block), Я провел много экспериментов и нашел это как результат. Опять явно вы можете сделать rdd = SparkContext().textFile("hdfs://.../file.txt", 400) чтобы получить 400 в качестве разделов или даже может сделать переразделение путем rdd.repartition или уменьшить до 10 на rdd.coalesce(10)

Другие вопросы по тегам