Использование Spark для записи файла паркета в s3 поверх s3a очень медленное
Я пытаюсь написать parquet
подать в Amazon S3
с помощью Spark 1.6.1
, Маленький parquet
что я генерирую ~2GB
однажды написано, так что это не так много данных. Я пытаюсь доказать Spark
в качестве платформы, которую я могу использовать.
По сути, я собираюсь создать star schema
с dataframes
тогда я собираюсь выписать эти таблицы на паркет. Данные поступают из CSV-файлов, предоставленных поставщиком, и я использую Spark в качестве ETL
Платформа. В настоящее время у меня есть 3 узла кластера в ec2(r3.2xlarge)
Так 120GB
памяти на исполнителей и всего 16 ядер.
Размер входных файлов составляет около 22 ГБ, и сейчас я извлекаю около 2 ГБ этих данных. В конце концов, когда я начну загружать полный набор данных, это будет много терабайт.
Вот моя искра / скала pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
Подсчет занимает около 2 минут для 465884512 строк. Запись в паркет занимает 38 минут
Я понимаю что coalesce
перетасовывает драйвер, который выполняет запись.... но количество времени, которое требуется, заставляет меня думать, что я делаю что-то серьезно неправильно. Без coalesce
, это все еще занимает 15 минут, что IMO все еще слишком долго и дает мне тонну маленьких parquet
файлы. Я хотел бы иметь один большой файл в день данных, которые я буду иметь. У меня есть код, чтобы сделать разбиение по значению поля, и это так же медленно. Я также пытался вывести это на csv
и это занимает ~1 час.
Кроме того, я не настраиваю реквизиты времени выполнения, когда отправляю свою работу. Моя консольная статистика для одной работы:
- Живых работников: 2
- Количество используемых ядер: 16 Всего, 16 использованных
- Используемая память: всего 117, 5 ГБ, 107, 5 ГБ использовано
- Приложения: 1 запущено, 5 выполнено
- Водители: 0 Бег, 0 Завершено
- Статус: ЖИВЫЙ
2 ответа
Значения по умолчанию Spark приводят к большому количеству (возможно) ненужных накладных расходов во время операций ввода-вывода, особенно при записи в S3. В этой статье это обсуждается более подробно, но есть 2 параметра, которые вы хотите рассмотреть, чтобы изменить.
Использование DirectParquetOutputCommitter.По умолчанию Spark сохраняет все данные во временную папку, а затем перемещает эти файлы.Использование DirectParquetOutputCommitter сэкономит время за счет прямой записи в выходной путь S3- Больше не доступен в Spark 2.0+
- Как указано в билете JIRA, текущее решение заключается в
- Переключите ваш код на использование s3a и Hadoop 2.7.2+; все лучше, лучше в Hadoop 2.8 и является основой для s3guard
- Используйте Hadoop FileOutputCommitter и установите для mapreduce.fileoutputcommitter.algorithm.version значение 2
- Как указано в билете JIRA, текущее решение заключается в
-Слияние схемы отключено по умолчанию, начиная с версии Spark 1.5.
Отключитеобъединениесхемы.Если объединение схем включено, узел драйвера будет сканировать все файлы для обеспечения согласованности схемы.Это особенно дорого, потому что это не распределенная операция.Убедитесь, что это отключено, выполнивval file = sqx.read.option("mergeSchema", "false").parquet(path)
- Больше не доступен в Spark 2.0+
Коммиттер прямого вывода удален из кодовой базы искры; Вы должны написать свой собственный / воскресить удаленный код в своем собственном JAR. Если вы это сделаете, выключите спекуляции в своей работе и знайте, что другие сбои могут также вызвать проблемы, где проблема - "неверные данные".
Более того, Hadoop 2.8 собирается добавить некоторые ускорения S3A специально для чтения оптимизированных двоичных форматов (ORC, Parquet) с S3; см. HADOOP-11694 для деталей. И некоторые люди работают над использованием Amazon Dynamo для согласованного хранилища метаданных, которое должно быть в состоянии сделать надежную фиксацию O(1) в конце работы.
Одним из непосредственных подходов к ускорению записи Spark в S3 является использование коммиттера, оптимизированного для EMRFS S3.
Однако, если вы используете s3a, этот коммиттер использовать нельзя:
Когда коммиттер, оптимизированный для EMRFS S3, не используется
Коммиттер не используется при следующих обстоятельствах:
When writing to HDFS -> When using the S3A file system When using an output format other than Parquet, such as ORC or text When using MapReduce or Spark's RDD API
Я тестировал эту разницу на AWS EMR 5.26, и использование s3:// было на 15%-30% быстрее, чем s3a:// (но все равно медленным).
Самый быстрый способ, которым мне удалось выполнить такое копирование / запись, - это записать Parquet в локальную HDFS, а затем использовать s3distcp для копирования на S3; в одном конкретном сценарии (несколько сотен небольших файлов) это было в 5 раз быстрее, чем запись DataFrame в Parquet непосредственно в S3.
У меня тоже была эта проблема. Помимо того, что сказали остальные, вот полное объяснение от AWS: https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/
Во время моего эксперимента простой переход на FileOutCommiter v2(с v1) улучшил запись в 3-4 раза.
self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")