Ошибка нехватки памяти при чтении большого файла в Spark 2.1.0
Я хочу использовать spark для чтения большого (51 ГБ) XML-файла (на внешнем жестком диске) в информационном кадре (с помощью плагина spark-xml), выполнить простое сопоставление / фильтрацию, переупорядочить его и затем записать обратно на диск в виде CSV файл.
Но я всегда получаю java.lang.OutOfMemoryError: Java heap space
как бы я не настраивал это.
Я хочу понять, почему увеличение количества разделов не останавливает ошибку OOM
Не следует ли разделить задачу на несколько частей, чтобы каждая отдельная часть была меньше и не вызывала проблем с памятью?
(Спарк не может пытаться заполнить все в памяти и вылетает, если он не подходит, верно??)
Вещи, которые я пробовал:
- перераспределение / объединение (5000 и 10000 разделов) фрейма данных при чтении и записи (начальное значение - 1604)
- используя меньшее количество исполнителей (6, 4, даже с 2 исполнителями я получаю ошибку OOM!)
- уменьшить размер разделенных файлов (по умолчанию выглядит как 33 МБ)
- дать тонны оперативки (все что у меня есть)
- увеличение
spark.memory.fraction
до 0,8 (по умолчанию 0,6) - снижение
spark.memory.storageFraction
до 0,2 (по умолчанию 0,5) - задавать
spark.default.parallelism
до 30 и 40 (по умолчанию 8 для меня) - задавать
spark.files.maxPartitionBytes
до 64M (по умолчанию 128M)
Весь мой код здесь (обратите внимание, я ничего не кеширую):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( df2.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
ЗАМЕТКИ
- входное разделение действительно мало (только 33 МБ), так почему я не могу иметь 8 потоков, каждый из которых обрабатывает одно разделение? это действительно не должно взорвать мою память
ОБНОВЛЕНИЕ Я написал более короткую версию кода, который просто читает файл, а затем forEachPartition(println).
Я получаю ту же ошибку OOM:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
PS: я использую spark v 2.1.0. У моей машины 8 ядер и 16 ГБ оперативной памяти.
3 ответа
Я получал эту ошибку при запуске spark-shell и, следовательно, увеличил объем памяти драйвера до большого числа. Тогда я смог загрузить XML.
spark-shell --driver-memory 6G
Потому что вы храните свой RDD дважды, и ваша логика должна быть изменена, как это или фильтр SparkSql
val df: DataFrame = SparkFactory.spark.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
.coalesce(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// filter and select only the cols i'm interested in
df
.where( df.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
Вы можете изменить размер кучи, добавив в переменную окружения следующее:
- Имя переменной среды: _JAVA_OPTIONS
- Значение переменной среды: -Xmx512M -Xms512M