Ошибка нехватки памяти при чтении большого файла в Spark 2.1.0

Я хочу использовать spark для чтения большого (51 ГБ) XML-файла (на внешнем жестком диске) в информационном кадре (с помощью плагина spark-xml), выполнить простое сопоставление / фильтрацию, переупорядочить его и затем записать обратно на диск в виде CSV файл.

Но я всегда получаю java.lang.OutOfMemoryError: Java heap space как бы я не настраивал это.

Я хочу понять, почему увеличение количества разделов не останавливает ошибку OOM

Не следует ли разделить задачу на несколько частей, чтобы каждая отдельная часть была меньше и не вызывала проблем с памятью?

(Спарк не может пытаться заполнить все в памяти и вылетает, если он не подходит, верно??)

Вещи, которые я пробовал:

  • перераспределение / объединение (5000 и 10000 разделов) фрейма данных при чтении и записи (начальное значение - 1604)
  • используя меньшее количество исполнителей (6, 4, даже с 2 исполнителями я получаю ошибку OOM!)
  • уменьшить размер разделенных файлов (по умолчанию выглядит как 33 МБ)
  • дать тонны оперативки (все что у меня есть)
  • увеличение spark.memory.fraction до 0,8 (по умолчанию 0,6)
  • снижение spark.memory.storageFraction до 0,2 (по умолчанию 0,5)
  • задавать spark.default.parallelism до 30 и 40 (по умолчанию 8 для меня)
  • задавать spark.files.maxPartitionBytes до 64M (по умолчанию 128M)

Весь мой код здесь (обратите внимание, я ничего не кеширую):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),
    df("_Title").as("title"),
    df("_Body").as("body"),
  ).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)

ЗАМЕТКИ

  • входное разделение действительно мало (только 33 МБ), так почему я не могу иметь 8 потоков, каждый из которых обрабатывает одно разделение? это действительно не должно взорвать мою память

ОБНОВЛЕНИЕ Я написал более короткую версию кода, который просто читает файл, а затем forEachPartition(println).

Я получаю ту же ошибку OOM:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),
   df("_Title").as("title"),
   df("_Body").as("body"),
   df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id, title, body, tags) =>
      Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

PS: я использую spark v 2.1.0. У моей машины 8 ядер и 16 ГБ оперативной памяти.

3 ответа

Я получал эту ошибку при запуске spark-shell и, следовательно, увеличил объем памяти драйвера до большого числа. Тогда я смог загрузить XML.

spark-shell --driver-memory 6G

Источник: https://github.com/lintool/warcbase/issues/246

Потому что вы храните свой RDD дважды, и ваша логика должна быть изменена, как это или фильтр SparkSql

 val df: DataFrame = SparkFactory.spark.read
      .option("mode", "DROPMALFORMED")
      .format("com.databricks.spark.xml")
      .schema(customSchema) // defined previously
      .option("rowTag", "row")
      .load(s"$pathToInputXML")
      .coalesce(numPartitions)

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
    // prints 1604


    // regexes to clean the text
    val tagPat = "<[^>]+>".r
    val angularBracketsPat = "><|>|<"
    val whitespacePat = """\s+""".r

    // filter and select only the cols i'm interested in
     df
      .where( df.col("_TypeId") === "1" )
      .select(
        df("_Id").as("id"),
        df("_Title").as("title"),
        df("_Body").as("body"),
      ).as[Post]
      .map{
        case Post(id,title,body,tags) =>

          val body1 = tagPat.replaceAllIn(body,"")
          val body2 = whitespacePat.replaceAllIn(body1," ")

          Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

      }
      .orderBy(rand(SEED)) // random sort
      .write // write it back to disk
      .option("quoteAll", true)
      .mode(SaveMode.Overwrite)
      .csv(output)

Вы можете изменить размер кучи, добавив в переменную окружения следующее:

  1. Имя переменной среды: _JAVA_OPTIONS
  2. Значение переменной среды: -Xmx512M -Xms512M
Другие вопросы по тегам