Приложение Spark Streaming не выполняет задания одновременно
У меня есть задание потоковой передачи Spark, которое читает из Kafka и сохраняет его в Redshift.
Пакет RDD содержит данные со столбцом «groupId», однако следующий код ниже не выполняется forEach одновременно, он запускает его последовательно в режиме клиента Yarn.
Окружающая среда пряжи:
- Клиентский режим
- Планировщик по умолчанию FIFO
- Экземпляры Exector 3+
- Исполнитель: 2 ядра, 2 ГБ
inputDstream.foreachRDD { eventRdd: RDD[Event] =>
...
// Convert eventRdd to eventDF
val groupIds = eventDF.select("group_id").distinct.collect.flatMap(_.toSeq)
groupIds.par.foreach{ groupId =>
val teventDF = eventDF.where($"group_id" <=> groupId)
val teventDFWithVersion = teventDF.withColumn("schema_id", lit(version))
teventDFWithVersion.write
.format("io.github.spark_redshift_community.spark.redshift")
.options(opts)
.mode("Append")
.save()
}
}
Опять же, операция в groupsIds.par.foreach выполняется последовательно, а не параллельно. По мере увеличения групп мое приложение начинает захлебываться и обрабатывать всплески времени.
Как заставить Spark сохранять мои пакеты данных одновременно?
1 ответ
Драйвер работает на m5.large (2 ЦП), но для приложения драйвера используется только 1 ЦП, поскольку другие службы занимают другой ЦП.
array.par.foreach {} выполняется одновременно в зависимости от количества доступных виртуальных ЦП.
Запуск драйвера с большим количеством ЦП позволяет выполнять больше одновременных операций записи.
Решение: запустите приложение драйвера Spark в клиентском режиме на машине с большим объемом ЦП. Или запустите приложение Spark в режиме кластера с --driver-cores 4.