Приложение Spark Streaming не выполняет задания одновременно

У меня есть задание потоковой передачи Spark, которое читает из Kafka и сохраняет его в Redshift.

Пакет RDD содержит данные со столбцом «groupId», однако следующий код ниже не выполняется forEach одновременно, он запускает его последовательно в режиме клиента Yarn.

Окружающая среда пряжи:

  • Клиентский режим
  • Планировщик по умолчанию FIFO
  • Экземпляры Exector 3+
  • Исполнитель: 2 ядра, 2 ГБ
        inputDstream.foreachRDD { eventRdd: RDD[Event] =>
      ...
      // Convert eventRdd to eventDF
      val groupIds = eventDF.select("group_id").distinct.collect.flatMap(_.toSeq)
      groupIds.par.foreach{ groupId =>
          val teventDF = eventDF.where($"group_id" <=> groupId)
          val teventDFWithVersion = teventDF.withColumn("schema_id", lit(version))
          teventDFWithVersion.write
            .format("io.github.spark_redshift_community.spark.redshift")
            .options(opts)
            .mode("Append")
            .save()
       }
  }

Опять же, операция в groupsIds.par.foreach выполняется последовательно, а не параллельно. По мере увеличения групп мое приложение начинает захлебываться и обрабатывать всплески времени.

Как заставить Spark сохранять мои пакеты данных одновременно?

1 ответ

Драйвер работает на m5.large (2 ЦП), но для приложения драйвера используется только 1 ЦП, поскольку другие службы занимают другой ЦП.

array.par.foreach {} выполняется одновременно в зависимости от количества доступных виртуальных ЦП.

Запуск драйвера с большим количеством ЦП позволяет выполнять больше одновременных операций записи.

Решение: запустите приложение драйвера Spark в клиентском режиме на машине с большим объемом ЦП. Или запустите приложение Spark в режиме кластера с --driver-cores 4.

Другие вопросы по тегам