Использование Spark для записи файла паркета в s3 поверх s3a очень медленное

Я пытаюсь написать parquet подать в Amazon S3 с помощью Spark 1.6.1, Маленький parquet что я генерирую ~2GB однажды написано, так что это не так много данных. Я пытаюсь доказать Spark в качестве платформы, которую я могу использовать.

По сути, я собираюсь создать star schema с dataframesтогда я собираюсь выписать эти таблицы на паркет. Данные поступают из CSV-файлов, предоставленных поставщиком, и я использую Spark в качестве ETL Платформа. В настоящее время у меня есть 3 узла кластера в ec2(r3.2xlarge) Так 120GB памяти на исполнителей и всего 16 ядер.

Размер входных файлов составляет около 22 ГБ, и сейчас я извлекаю около 2 ГБ этих данных. В конце концов, когда я начну загружать полный набор данных, это будет много терабайт.

Вот моя искра / скала pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

Подсчет занимает около 2 минут для 465884512 строк. Запись в паркет занимает 38 минут

Я понимаю что coalesce перетасовывает драйвер, который выполняет запись.... но количество времени, которое требуется, заставляет меня думать, что я делаю что-то серьезно неправильно. Без coalesce, это все еще занимает 15 минут, что IMO все еще слишком долго и дает мне тонну маленьких parquet файлы. Я хотел бы иметь один большой файл в день данных, которые я буду иметь. У меня есть код, чтобы сделать разбиение по значению поля, и это так же медленно. Я также пытался вывести это на csv и это занимает ~1 час.

Кроме того, я не настраиваю реквизиты времени выполнения, когда отправляю свою работу. Моя консольная статистика для одной работы:

  • Живых работников: 2
  • Количество используемых ядер: 16 Всего, 16 использованных
  • Используемая память: всего 117, 5 ГБ, 107, 5 ГБ использовано
  • Приложения: 1 запущено, 5 выполнено
  • Водители: 0 Бег, 0 Завершено
  • Статус: ЖИВЫЙ

2 ответа

Значения по умолчанию Spark приводят к большому количеству (возможно) ненужных накладных расходов во время операций ввода-вывода, особенно при записи в S3. В этой статье это обсуждается более подробно, но есть 2 параметра, которые вы хотите рассмотреть, чтобы изменить.

  • Использование DirectParquetOutputCommitter. По умолчанию Spark сохраняет все данные во временную папку, а затем перемещает эти файлы. Использование DirectParquetOutputCommitter сэкономит время за счет прямой записи в выходной путь S3

    • Больше не доступен в Spark 2.0+
      • Как указано в билете JIRA, текущее решение заключается в
        1. Переключите ваш код на использование s3a и Hadoop 2.7.2+; все лучше, лучше в Hadoop 2.8 и является основой для s3guard
        2. Используйте Hadoop FileOutputCommitter и установите для mapreduce.fileoutputcommitter.algorithm.version значение 2

    -Слияние схемы отключено по умолчанию, начиная с версии Spark 1.5. Отключите объединение схемы. Если объединение схем включено, узел драйвера будет сканировать все файлы для обеспечения согласованности схемы. Это особенно дорого, потому что это не распределенная операция. Убедитесь, что это отключено, выполнив

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

Коммиттер прямого вывода удален из кодовой базы искры; Вы должны написать свой собственный / воскресить удаленный код в своем собственном JAR. Если вы это сделаете, выключите спекуляции в своей работе и знайте, что другие сбои могут также вызвать проблемы, где проблема - "неверные данные".

Более того, Hadoop 2.8 собирается добавить некоторые ускорения S3A специально для чтения оптимизированных двоичных форматов (ORC, Parquet) с S3; см. HADOOP-11694 для деталей. И некоторые люди работают над использованием Amazon Dynamo для согласованного хранилища метаданных, которое должно быть в состоянии сделать надежную фиксацию O(1) в конце работы.

Одним из непосредственных подходов к ускорению записи Spark в S3 является использование коммиттера, оптимизированного для EMRFS S3.

Однако, если вы используете s3a, этот коммиттер использовать нельзя:

Когда коммиттер, оптимизированный для EMRFS S3, не используется

Коммиттер не используется при следующих обстоятельствах:

When writing to HDFS

-> When using the S3A file system

When using an output format other than Parquet, such as ORC or text

When using MapReduce or Spark's RDD API

Я тестировал эту разницу на AWS EMR 5.26, и использование s3:// было на 15%-30% быстрее, чем s3a:// (но все равно медленным).

Самый быстрый способ, которым мне удалось выполнить такое копирование / запись, - это записать Parquet в локальную HDFS, а затем использовать s3distcp для копирования на S3; в одном конкретном сценарии (несколько сотен небольших файлов) это было в 5 раз быстрее, чем запись DataFrame в Parquet непосредственно в S3.

У меня тоже была эта проблема. Помимо того, что сказали остальные, вот полное объяснение от AWS: https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Во время моего эксперимента простой переход на FileOutCommiter v2(с v1) улучшил запись в 3-4 раза.

self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
Другие вопросы по тегам