Решение проблем с зависимостями в Apache Spark

Общие проблемы при создании и развертывании приложений Spark:

  • java.lang.ClassNotFoundException,
  • object x is not a member of package y ошибки компиляции.
  • java.lang.NoSuchMethodError

Как это можно решить?

5 ответов

Решение

При создании и развертывании приложений Spark для всех зависимостей требуются совместимые версии.

  • Скала версия. Все пакеты должны использовать одну и ту же основную (2.10, 2.11, 2.12) версию Scala.

    Подумайте о следующем (неверно) build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    Мы используем spark-streaming для Scala 2.10, а остальные пакеты для Scala 2.11. Допустимый файл может быть

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.11" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    но лучше указывать версию глобально и использовать %%:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "2.0.1",
       "org.apache.spark" %% "spark-streaming" % "2.0.1",
       "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1"
    )
    

    Аналогично в Maven:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Версия Spark Все пакеты должны использовать одну и ту же основную версию Spark (1.6, 2.0, 2.1, ...).

    Подумайте о следующем (неправильном) build.sbt:

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "1.6.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    Мы используем spark-core 1.6 пока остальные компоненты есть в Spark 2.0. Допустимый файл может быть

    name := "Simple Project"
    
    version := "1.0"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % "2.0.1",
       "org.apache.spark" % "spark-streaming_2.10" % "2.0.1",
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1"
    )
    

    но лучше использовать переменную:

    name := "Simple Project"
    
    version := "1.0"
    
    val sparkVersion = "2.0.1"
    
    libraryDependencies ++= Seq(
       "org.apache.spark" % "spark-core_2.11" % sparkVersion,
       "org.apache.spark" % "spark-streaming_2.10" % sparkVersion,
       "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion
    )
    

    Аналогично в Maven:

    <project>
      <groupId>com.example</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <properties>
        <spark.version>2.0.1</spark.version>
        <scala.version>2.11</scala.version>
      </properties> 
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency> 
        <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>spark-streaming-twitter_${scala.version}</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    </project>
    
  • Версия Spark, используемая в зависимостях Spark, должна соответствовать версии Spark установки Spark. Например, если вы используете 1.6.1 в кластере, вы должны использовать 1.6.1 для создания jar-файлов. Несоответствия младших версий не всегда принимаются.

  • Версия Scala, используемая для сборки jar, должна соответствовать версии Scala, используемой для сборки развернутого Spark. По умолчанию (загружаемые двоичные файлы и сборки по умолчанию):

    • Spark 1.x -> Scala 2.10
    • Spark 2.x -> Scala 2.11
  • Дополнительные пакеты должны быть доступны на рабочих узлах, если они включены в толстый флягу. Есть несколько вариантов, в том числе:

    • --jars аргумент в пользу spark-submit - раздать местный jar файлы.
    • --packages аргумент в пользу spark-submit - получить зависимости из репозитория Maven.

    При отправке в узле кластера вы должны включить приложение jar в --jars,

Путь к классам в Apache Spark создается динамически (с учетом пользовательского кода приложения), что делает его уязвимым для таких проблем. user7337271 правильный, но есть еще некоторые проблемы, в зависимости от того, какой менеджер кластера ("мастер") вы используете.

Во-первых, приложение Spark состоит из этих компонентов (каждый из них является отдельной JVM, поэтому потенциально содержит разные классы в своем пути к классам):

  1. Драйвер: это ваше приложение, создающее SparkSession (или же SparkContext) и подключение к диспетчеру кластеров для выполнения реальной работы
  2. Диспетчер кластеров: служит "точкой входа" в кластер, отвечающий за распределение исполнителей для каждого приложения. В Spark поддерживается несколько различных типов: standalone, YARN и Mesos, которые мы опишем ниже.
  3. Исполнители: это процессы на узлах кластера, выполняющие реальную работу (выполнение задач Spark)

Соотношение между ними описано в этой диаграмме из обзора кластерного режима Apache Spark:

Обзор режима кластера

Теперь - какие классы должны находиться в каждом из этих компонентов?

На это можно ответить следующей диаграммой:

Обзор размещения классов

Давайте разберем это медленно:

  1. Spark Code - это библиотеки Spark. Они должны существовать во ВСЕХ трех компонентах, поскольку они включают клей, который позволяет Spark осуществлять связь между ними. Кстати, авторы Spark разработали решение включить код для ВСЕХ компонентов во ВСЕ компоненты (например, чтобы включить код, который должен запускаться только в Executor в драйвере), чтобы упростить это - так что "жирная банка" Spark (в версиях до 1.6) или "архив" (в 2.0, подробности ниже) содержат необходимый код для всех компонентов и должны быть доступны во всех них.

  2. Код только для драйвера - это код пользователя, который не включает в себя ничего, что должно использоваться в Executors, то есть код, который не используется ни в каких преобразованиях в RDD / DataFrame / Dataset. Это не обязательно должно быть отделено от распределенного пользовательского кода, но это может быть.

  3. Распределенный код - это код пользователя, который скомпилирован с кодом драйвера, но также должен быть выполнен на Исполнителях - все фактические используемые преобразования должны быть включены в этот файл (ы).

Теперь, когда мы получили это прямо, как нам заставить классы загружаться правильно в каждом компоненте, и каким правилам они должны следовать?

  1. Spark Code: как указано в предыдущих ответах, вы должны использовать одинаковые версии Scala и Spark во всех компонентах.

    1.1 В автономном режиме существует "существующая" установка Spark, к которой могут подключаться приложения (драйверы). Это означает, что все драйверы должны использовать одну и ту же версию Spark, работающую на мастере и исполнителях.

    1.2 В YARN / Mesos каждое приложение может использовать свою версию Spark, но все компоненты одного и того же приложения должны использовать одну и ту же. Это означает, что если вы использовали версию X для компиляции и упаковки вашего драйвера, вы должны предоставить ту же версию при запуске SparkSession (например, через spark.yarn.archive или же spark.yarn.jars параметры при использовании YARN). Предоставленные вами файлы jars / archive должны включать все зависимости Spark (включая транзитивные), и менеджер кластера будет отправлять их каждому исполнителю при запуске приложения.

  2. Код драйвера: все полностью - код драйвера может быть отправлен в виде набора jar или "толстого jar", если он включает все зависимости Spark + весь пользовательский код

  3. Распределенный код: помимо присутствия в драйвере, этот код должен быть отправлен исполнителям (опять же, вместе со всеми его переходными зависимостями). Это делается с помощью spark.jars параметр.

Подводя итог, вот предложенный подход к созданию и развертыванию Spark Application (в данном случае - с использованием YARN):

  • Создайте библиотеку с вашим распределенным кодом, упакуйте ее как "обычный" jar (с файлом.pom, описывающим его зависимости), так и как "fat jar" (со всеми включенными в него переходными зависимостями).
  • Создайте приложение драйвера с зависимостями компиляции в вашей библиотеке распределенного кода и в Apache Spark (с определенной версией)
  • Упакуйте приложение драйвера в толстый файл для развертывания в драйвере
  • Передайте правильную версию вашего распределенного кода в качестве значения spark.jars параметр при запуске SparkSession
  • Передайте местоположение файла архива (например, gzip), содержащего все файлы lib/ папка загруженных бинарных файлов Spark в качестве значения spark.yarn.archive

В дополнение к очень обширному ответу, уже предоставленному пользователем 7337271, если проблема вызвана отсутствием внешних зависимостей, вы можете создать jar с вашими зависимостями, например, с помощью подключаемого модуля maven

В этом случае обязательно пометьте все зависимости искры ядра как "предоставленные" в вашей системе сборки и, как уже отмечалось, убедитесь, что они коррелируют с вашей версией искры времени выполнения.

Классы зависимостей вашего приложения должны быть указаны в опции application-jar вашей команды запуска.

Более подробную информацию можно найти в документации Spark.

Взято из документации:

application-jar: путь к пакетному jar, включающему ваше приложение и все зависимости. URL должен быть глобально видимым внутри вашего кластера, например, путь hdfs:// или путь file://, который присутствует на всех узлах

Добавьте в проект все файлы jar из spark-2.4.0-bin-hadoop2.7\spark-2.4.0-bin-hadoop2.7\jars. Спарк-2.4.0-bin-hadoop2.7 можно загрузить с https://spark.apache.org/downloads.html

Я думаю эту проблему должен решить сборочный плагин. Вам нужно построить толстую банку. Например, в sbt:

  • добавить файл $PROJECT_ROOT/project/assembly.sbt с кодом addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • для сборки.sbtadded some librarieslibraryDependencies ++ = Seq ("com.some.company" %% "some-lib"% "1.0.0") `
  • в консоли sbt введите "сборка" и разверните сборку jar

Если вам нужна дополнительная информация, перейдите по https://github.com/sbt/sbt-assembly

У меня есть следующий build.sbt

lazy val root = (project in file(".")).
  settings(
    name := "spark-samples",
    version := "1.0",
    scalaVersion := "2.11.12",
    mainClass in Compile := Some("StreamingExample")        
  )

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.0",
  "org.apache.spark" %% "spark-streaming" % "2.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "com.couchbase.client" %% "spark-connector" % "2.2.0" 
)

// META-INF discarding
assemblyMergeStrategy in assembly := {
       case PathList("META-INF", xs @ _*) => MergeStrategy.discard
       case x => MergeStrategy.first
   }

Я создал толстую флягу своего приложения, используя плагин сборки sbt, но при запуске с использованием spark-submit он завершается с ошибкой:

java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
    at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)

Я вижу, что класс существует в моей толстой банке:

jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep 'Completable$OnSubscribe'
rx/Completable$OnSubscribe.class

не уверен, что мне здесь не хватает, какие-нибудь подсказки?

Другие вопросы по тегам