Исключить утечку памяти из кучи на пряже с помощью прямого потока Kafka
Я использую потоковую версию 1.4.0 на Yarn (дистрибутив Apache 2.6.0) с java 1.8.0_45, а также прямой поток Kafka. Я также использую спарк с поддержкой Scala 2.11.
Проблема, с которой я сталкиваюсь, заключается в том, что контейнеры как драйвера, так и исполнителя постепенно увеличивают использование физической памяти до такой степени, что контейнер пряжи уничтожает ее. Я настроил до 192M кучи и 384 свободного места в моем драйвере, но в итоге у него кончается
Память кучи выглядит нормально с регулярными циклами GC. Ни в одном из таких запусков OutOffMemory не встречалось
Infact Я не генерирую трафик в очередях kafka, но это происходит. Вот код, который я использую
object SimpleSparkStreaming extends App {
val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name"));
val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
kafkaStream.foreachRDD(rdd => {
rdd.foreach(x => {
println(x._2)
})
})
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
Я запускаю это на CentOS 7. Команда, используемая для отправки искры, следующая
./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar
Любая помощь с благодарностью
С Уважением,
Apoorva
2 ответа
Попробуйте увеличить ядра исполнителя. В вашем примере единственное ядро предназначено для использования потоковых данных, не оставляя ядер для обработки входящих данных.
Это может быть утечка памяти... Вы пробовали использовать conf.set("spark.executor.extraJavaOptions","-XX:+UseG1GC")?
Это не ответ Kafka, он будет изолирован от Spark и о том, как его система каталогизации некачественная, когда дело доходит до согласованной устойчивости и больших операций. Если вы постоянно выполняете запись на уровне сохраняемости (т. Е. В цикле, повторно сохраняющем DF после большой операции, а затем выполняя ее снова) или выполняете большой запрос (т. Е. InputDF.distinct.count); задание Spark начнет помещать некоторые данные в память и неэффективно удалять устаревшие объекты.
Это означает, что со временем объект, который однажды мог быстро запуститься, будет постоянно замедляться, пока не останется доступной памяти. Для всех, кто находится дома, запускает AWS EMR с большим DataFrame, загруженным в среду, выполните следующий запрос:
var iterator = 1
val endState = 15
var currentCount = 0
while (iterator <= endState) {
currentCount = inputDF.distinct.count
print("The number of unique records are : " + currentCount)
iterator = iterator + 1
}
Во время выполнения задания следите за управлением памятью пользовательского интерфейса Spark, если DF достаточно велик для сеанса, вы начнете замечать снижение времени выполнения с каждым последующим запуском, в основном блоки становятся устаревшими, но Spark не может идентифицировать когда чистить эти блоки.
Лучший способ найти решение этой проблемы - это написать свой DF локально, очистить слой настойчивости и загрузить данные обратно. Это подход к проблеме "кувалдой", но для моего бизнес-кейса это был простое решение, которое привело к увеличению времени выполнения наших больших таблиц на 90% (от 540 минут до примерно 40 с меньшим объемом памяти).
В настоящее время я использую следующий код:
val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
spark.catalog.clearCache
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count
Вот производная, если вы не теряете DF в дочерних подпроцессах:
val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
for ((k,v) <- sc.getPersistentRDDs) {
v.unpersist()
}
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count