ускорение сильно секционированного фрейма данных до s3 на блоках данных
Я веду записную книжку на Databricks, которая создает разделенные фреймы данных PySpark и загружает их в s3. Рассматриваемая таблица содержит ~5000 файлов и имеет общий размер ~5 ГБ (она должна быть разделена таким образом, чтобы Афина могла эффективно запрашивать ее). Моя проблема в том, что запись файлов в s3 кажется скорее последовательной, чем параллельной, и может занять до одного часа. Например:
df.repartition("customer_id")
.write.partitionBy("customer_id")
.mode("overwrite")
.format("parquet")
.save("s3a://mybucket/path-to-table/")
Я запустил свой кластер (i3.xlarge) на AWS со следующей конфигурацией:
spark.hadoop.orc.overwrite.output.file true
spark.databricks.io.directoryCommit.enableLogicalDelete true
spark.sql.sources.commitProtocolClass org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
parquet.enable.summary-metadata false
spark.hadoop.fs.s3.maxRetries 20
spark.databricks.hive.metastore.glueCatalog.enabled true
spark.hadoop.validateOutputSpecs false
mapreduce.fileoutputcommitter.marksuccessfuljobs false
spark.sql.legacy.parquet.datetimeRebaseModeInRead CORRECTED
spark.hadoop.fs.s3.consistent.retryPeriodSeconds 10
spark.speculation true
spark.hadoop.fs.s3.consistent true
spark.hadoop.fs.s3.consistent.retryCount 5
Каков рекомендуемый подход в этом случае, когда у меня есть много небольших файлов, которые мне нужно быстро записать в s3?
2 ответа
Я вижу несколько причин, по которым ваша запись идет медленно и может быть ускорена:
- У вас может быть более 5000 клиентов? Таким образом, с разделением by у вас, вероятно, будет более 5000 разделов. Это может быть очень медленным с Parquet (без таблиц Delta Lake) из-за накладных расходов в метамагазине. Не думаю, что вам нужно столько разделов.
- При 5000 файлов на 5 ГБ каждый файл имеет размер около 1 МБ. Это очень мало. Для этой проблемы размер записываемых файлов должен быть ближе к 100 МБ.
- Параметры кластера по умолчанию очень хорошо спроектированы, мне очень редко нужно их менять, когда я это делаю, я включаю новые функции. Вы должны попытаться решить указанные выше вопросы, а также удалить все эти переопределения в настройках.
- Repartition ("customer_id") и partitionBy("customer_id") избыточны.
Рекомендовать:
- Получив размер файла до ~ 100 МБ, вы можете сделать это с помощью coalesce(), если на предыдущем этапе было создано> 50 разделов.
- Избавьтесь от раздела с помощью customer_id, возможно, вы думаете, что для этого есть веские причины, но маленькие файлы и большое количество разделов убивают вашу производительность.
- Попробуйте открытый формат Delta Lake (например,
CREATE TABLE ... USING DELTA LOCATION ...
. Это ускорит выборочные запросы ваших клиентов, присоединения к customer_id будут ускорены, если вы такжеOPTIMIZE ... ZORDER BY customer_id
и может автоматически оптимизировать размер ваших файлов.
Окончательный результат выглядит намного чище:
df.coalesce(50)
.write
.mode("overwrite")
.format("delta")
.save("s3a://mybucket/path-to-table/")
См. Параметры автоматической оптимизации для автоматизации изменения размера файла: https://docs.databricks.com/delta/optimizations/auto-optimize.html
Таблицы Delta Lake можно использовать с Athena https://docs.databricks.com/delta/presto-integration.html
На ведре s3 вы установили fs.s3a.fast.upload = true
? Ищу аналогичный билет по этой ссылке