Метод записи Spark Dataframe для записи множества маленьких файлов
У меня довольно простая работа, покрывающая файлы журналов для паркета. Он обрабатывает 1,1 ТБ данных (разделенных на 64 МБ - 128 МБ файлов - размер нашего блока составляет 128 МБ), что составляет около 12 тысяч файлов.
Работа работает следующим образом:
val events = spark.sparkContext
.textFile(s"$stream/$sourcetype")
.map(_.split(" \\|\\| ").toList)
.collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
.toDF()
df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
Он собирает события с общей схемой, преобразует их в DataFrame, а затем записывает как паркет.
У меня проблема в том, что это может вызвать небольшой взрыв ввода-вывода в кластере HDFS, поскольку он пытается создать очень много крошечных файлов.
В идеале я хочу создать только несколько паркетных файлов в разделе "дата".
Как лучше всего это контролировать? Это с помощью coalesce()?
Как это повлияет на количество файлов, созданных в данном разделе? Это зависит от того, сколько исполнителей у меня работает в Spark? (в настоящее время установлено на 100).
4 ответа
Вы должны переделать свой DataFrame
чтобы соответствовать разделению DataFrameWriter
Попробуй это:
df
.repartition($"date")
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
В Python вы можете переписать ответ Рафаэля Рота как:
(df
.repartition("date")
.write.mode("append")
.partitionBy("date")
.parquet("{path}".format(path=path)))
Вы также можете рассмотреть возможность добавления дополнительных столбцов в .repartition
чтобы избежать проблем с очень большими перегородками:
(df
.repartition("date", another_column, yet_another_colum)
.write.mode("append")
.partitionBy("date)
.parquet("{path}".format(path=path)))
Простейшим решением будет заменить фактическое разбиение на:
df
.repartition(to_date($"date"))
.write.mode(SaveMode.Append)
.partitionBy("date")
.parquet(s"$path")
Вы также можете использовать более точное разбиение для вашего DataFrame
то есть день и, возможно, час часового диапазона. и тогда вы можете быть менее точным для писателя. Это на самом деле зависит от объема данных.
Вы можете уменьшить энтропию путем разделения DataFrame
и запись с разделом по предложению.
Я столкнулся с той же проблемой, и я мог с помощью coalesce
решил мою проблему.
df
.coalesce(3) // number of parts/files
.write.mode(SaveMode.Append)
.parquet(s"$path")
Для получения дополнительной информации об использовании coalesce
или же repartition
Вы можете обратиться к следующей искре: объединение или передел
Дублируем мой ответ отсюда: /questions/35117878/spark-parketnaya-peregorodka-bolshoe-kolichestvo-fajlov/35117900#35117900
Это работает для меня очень хорошо:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
Он создает N файлов в каждом выходном разделе (каталоге) и (анекдотически) быстрее, чем при использовании coalesce
и (опять же, по моим собственным данным) быстрее, чем только перераспределение на выходе.
Если вы работаете с S3, я также рекомендую делать все на локальных дисках (Spark много делает для создания / переименования / удаления файлов во время записи), и как только все решено, используйте hadoop FileUtil
(или просто aws cli), чтобы скопировать все:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Как насчет того, чтобы попробовать запустить такие сценарии как задание карты, объединяющее все файлы паркета в один:
$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
-Dmapred.reduce.tasks=1 \
-input "/hdfs/input/dir" \
-output "/hdfs/output/dir" \
-mapper cat \
-reducer cat