Работа Spark просто зависает с большими данными
Я пытаюсь запросить от s3 (15 дней данных). Я пытался запрашивать их отдельно (каждый день), он работает нормально. Хорошо работает и 14 дней. Но когда я запрашиваю 15 дней, задание продолжает работать вечно (зависает), и задача № не обновляется.
Мои настройки:
Я использую кластер из 51 узла r3.4x большой с динамическим распределением и включенным максимальным ресурсом.
Все, что я делаю, это =
val startTime="2017-11-21T08:00:00Z"
val endTime="2017-12-05T08:00:00Z"
val start = DateUtils.getLocalTimeStamp( startTime )
val end = DateUtils.getLocalTimeStamp( endTime )
val days: Int = Days.daysBetween( start, end ).getDays
val files: Seq[String] = (0 to days)
.map( start.plusDays )
.map( d => s"$input_path${DateTimeFormat.forPattern( "yyyy/MM/dd" ).print( d )}/*/*" )
sqlSession.sparkContext.textFile( files.mkString( "," ) ).count
Когда я выполняю то же самое с 14 днями, я получаю 197337380 (количество) и я запускаю 15-й день отдельно и получаю 27676788. Но когда я запрашиваю всего 15 дней, работа зависает
Обновить:
Работа отлично работает с:
var df = sqlSession.createDataFrame(sc.emptyRDD[Row], schema)
for(n <- files ){
val tempDF = sqlSession.read.schema( schema ).json(n)
df = df(tempDF)
}
df.count
Но кто-нибудь может объяснить, почему это работает сейчас, но не раньше?
ОБНОВЛЕНИЕ: После установки mapreduce.input.fileinputformat.split.minsize до 256 ГБ, теперь оно работает нормально.
2 ответа
Динамическое распределение и максимальное выделение ресурсов - это разные настройки, одна из которых будет отключена, когда активна другая. При максимизации выделения ресурсов в EMR запускается 1 исполнитель на узел, и он выделяет все ядра и память этому исполнителю.
Я бы порекомендовал выбрать другой маршрут. Похоже, у вас довольно большой кластер с 51 узлом, но вы не уверены, требуется ли он вообще. Однако для начала следуйте этому практическому правилу, и вы научитесь настраивать эти конфигурации.
- Кластерная память - минимум в 2 раза больше данных, с которыми вы имеете дело.
Теперь предположим, что вам нужно 51 узел, попробуйте ниже:
- В r3.4x есть 16 процессоров - так что вы можете использовать их все, оставив один для ОС и других процессов.
- Установите количество исполнителей на 150 - это выделит 3 исполнителя на узел.
- Установите количество ядер на одного исполнителя до 5 (3 исполнителя на узел)
- Установите в памяти исполнителя примерно общую память хоста /3 = 35G
- Вы должны контролировать параллелизм (разделы по умолчанию), установите для этого числа общее количество ядер, которое у вас есть ~ 800
- Настройте случайные разделы - сделайте это вдвое больше ядер - 1600
Выше конфигурации работали как шарм для меня. Вы можете отслеживать использование ресурсов в Spark UI.
Кроме того, в вашей конфигурации пряжи /etc/hadoop/conf/capacity-scheduler.xml
файл, набор yarn.scheduler.capacity.resource-calculator
в org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
- что позволит Spark действительно работать на полную мощность с этими процессорами. Перезапустите службу пряжи после изменения.
Вы должны увеличить память исполнителя и # executors. Если объем данных огромен, попробуйте увеличить объем памяти драйвера.
Мое предложение состоит в том, чтобы не использовать динамическое распределение ресурсов и позволить ему запускаться и посмотреть, зависает ли он по-прежнему (обратите внимание, что задание spark может использовать все ресурсы кластера и заставить другие приложения испытывать недостаток ресурсов, попробуйте этот подход, когда не выполняется ни одно задание). если он не зависает, это означает, что вы должны поиграть с распределением ресурсов, а затем начать жестко кодировать ресурсы и продолжать увеличивать ресурсы, чтобы вы могли найти лучшее распределение ресурсов, которое вы можете использовать.
Ниже ссылки могут помочь вам понять распределение ресурсов и оптимизацию ресурсов.
http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/