Метод записи Spark Dataframe для записи множества маленьких файлов

У меня довольно простая работа, покрывающая файлы журналов для паркета. Он обрабатывает 1,1 ТБ данных (разделенных на 64 МБ - 128 МБ файлов - размер нашего блока составляет 128 МБ), что составляет около 12 тысяч файлов.

Работа работает следующим образом:

 val events = spark.sparkContext
  .textFile(s"$stream/$sourcetype")
  .map(_.split(" \\|\\| ").toList)
  .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
  .toDF()

df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")

Он собирает события с общей схемой, преобразует их в DataFrame, а затем записывает как паркет.

У меня проблема в том, что это может вызвать небольшой взрыв ввода-вывода в кластере HDFS, поскольку он пытается создать очень много крошечных файлов.

В идеале я хочу создать только несколько паркетных файлов в разделе "дата".

Как лучше всего это контролировать? Это с помощью coalesce()?

Как это повлияет на количество файлов, созданных в данном разделе? Это зависит от того, сколько исполнителей у меня работает в Spark? (в настоящее время установлено на 100).

4 ответа

Решение

Вы должны переделать свой DataFrame чтобы соответствовать разделению DataFrameWriter

Попробуй это:

df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")

В Python вы можете переписать ответ Рафаэля Рота как:

(df
  .repartition("date")
  .write.mode("append")
  .partitionBy("date")
  .parquet("{path}".format(path=path)))

Вы также можете рассмотреть возможность добавления дополнительных столбцов в .repartition чтобы избежать проблем с очень большими перегородками:

(df
  .repartition("date", another_column, yet_another_colum)
  .write.mode("append")
  .partitionBy("date)
  .parquet("{path}".format(path=path)))

Простейшим решением будет заменить фактическое разбиение на:

df
 .repartition(to_date($"date"))
 .write.mode(SaveMode.Append)
 .partitionBy("date")
 .parquet(s"$path")

Вы также можете использовать более точное разбиение для вашего DataFrame то есть день и, возможно, час часового диапазона. и тогда вы можете быть менее точным для писателя. Это на самом деле зависит от объема данных.

Вы можете уменьшить энтропию путем разделения DataFrame и запись с разделом по предложению.

Я столкнулся с той же проблемой, и я мог с помощью coalesce решил мою проблему.

df
  .coalesce(3) // number of parts/files 
  .write.mode(SaveMode.Append)
  .parquet(s"$path")

Для получения дополнительной информации об использовании coalesce или же repartition Вы можете обратиться к следующей искре: объединение или передел

Дублируем мой ответ отсюда: /questions/35117878/spark-parketnaya-peregorodka-bolshoe-kolichestvo-fajlov/35117900#35117900

Это работает для меня очень хорошо:

data.repartition(n, "key").write.partitionBy("key").parquet("/location")

Он создает N файлов в каждом выходном разделе (каталоге) и (анекдотически) быстрее, чем при использовании coalesce и (опять же, по моим собственным данным) быстрее, чем только перераспределение на выходе.

Если вы работаете с S3, я также рекомендую делать все на локальных дисках (Spark много делает для создания / переименования / удаления файлов во время записи), и как только все решено, используйте hadoop FileUtil (или просто aws cli), чтобы скопировать все:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
  def copy(
          in : String,
          out : String,
          sparkSession: SparkSession
          ) = {
    FileUtil.copy(
      FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
      new Path(in),
      FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
      new Path(out),
      false,
      sparkSession.sparkContext.hadoopConfiguration
    )
  }

Как насчет того, чтобы попробовать запустить такие сценарии как задание карты, объединяющее все файлы паркета в один:

$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
 -Dmapred.reduce.tasks=1 \
 -input "/hdfs/input/dir" \
 -output "/hdfs/output/dir" \
 -mapper cat \
 -reducer cat
Другие вопросы по тегам