Как получить доступ к файлам s3a:// из Apache Spark?

Hadoop 2.6 не поддерживает s3a из коробки, поэтому я попробовал ряд решений и исправлений, в том числе:

развернуть с помощью hadoop-aws и aws-java-sdk => невозможно прочитать переменную среды для учетных данных, добавить hadoop-aws в maven => различные конфликты переходных зависимостей

Кто-нибудь успешно заставил оба работать?

12 ответов

Испытав на собственном опыте разницу между s3a и s3n - 7,9 ГБ данных, передаваемых по s3a, составляли ~7 минут, в то время как 7,9 ГБ данных по s3n заняли 73 минуты [us-east-1 to us-west-1, к сожалению, в обоих случаях; Redshift и Lambda, которые сейчас используют us-east-1], это очень важная часть стека, которую нужно исправить, и она того стоит.

Вот ключевые части, по состоянию на декабрь 2015 года:

  1. Вашему кластеру Spark понадобится Hadoop версии 2.x или выше. Если вы используете установочные скрипты Spark EC2 и, возможно, пропустили его, переключение на использование чего-то отличного от 1.0 - указать --hadoop-major-version 2 (который использует CDH 4.2 на момент написания статьи).

  2. Вам нужно будет включить то, что на первый взгляд может показаться устаревшей библиотекой AWS SDK (построено в 2014 году как версия 1.7.4) для версий Hadoop, начиная с 2.7.1 (стабильная версия): aws-java-sdk 1.7 +0,4. Насколько я могу судить, использование этого вместе с конкретными JAR-файлами AWS SDK для 1.10.8 ничего не сломало.

  3. Вам также понадобится JAR hadoop-aws 2.7.1 на пути к классам. Этот JAR содержит класс org.apache.hadoop.fs.s3a.S3AFileSystem,

  4. В spark.properties Вы, вероятно, хотите, чтобы некоторые настройки выглядели так:

    spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  
    spark.hadoop.fs.s3a.access.key=ACCESSKEY  
    spark.hadoop.fs.s3a.secret.key=SECRETKEY
    

Я подробно описал этот список в посте, который я написал, когда проходил этот процесс. Кроме того, я рассмотрел все исключительные случаи, с которыми я столкнулся по пути, и то, что я считаю причиной каждого и как их исправить.

Я пишу этот ответ для доступа к файлам с S3A из Spark 2.0.1 на Hadoop 2.7.3

Скопируйте баночки с AWS (hadoop-aws-2.7.3.jar а также aws-java-sdk-1.7.4.jar) который поставляется с Hadoop по умолчанию

  • Подсказка: если места банок не уверены? может помочь команда find как привилегированный пользователь, команды могут быть..

     find / -name hadoop-aws*.jar
     find / -name aws-java-sdk*.jar
    

в искровой путь, который содержит все искровые банки

  • Подсказка: мы не можем напрямую указать местоположение (оно должно быть в файле свойств), так как я хочу сделать общий ответ для дистрибутивов и разновидностей Linux. classpath искры может быть идентифицирован командой поиска ниже

     find / -name spark-core*.jar
    

в spark-defaults.conf

Подсказка: (В основном это будет помещено в /etc/spark/conf/spark-defaults.conf)

#make sure jars are added to CLASSPATH
spark.yarn.jars=file://{spark/home/dir}/jars/*.jar,file://{hadoop/install/dir}/share/hadoop/tools/lib/*.jar


spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  
spark.hadoop.fs.s3a.access.key={s3a.access.key} 
spark.hadoop.fs.s3a.secret.key={s3a.secret.key} 
#you can set above 3 properties in hadoop level `core-site.xml` as well by removing spark prefix.

в свечу входят банки (aws-java-sdk а также hadoop-aws) в --driver-class-path если нужно.

spark-submit --master yarn \
  --driver-class-path {spark/jars/home/dir}/aws-java-sdk-1.7.4.jar \
  --driver-class-path {spark/jars/home/dir}/hadoop-aws-2.7.3.jar \
  other options

Замечания:

Убедитесь, что пользователь Linux с правами чтения, прежде чем запускать find Команда для предотвращения ошибки В доступе отказано

Я получил его с помощью готового бинарного файла Spark 1.4.1 с hadoop 2.6. Убедитесь, что вы установили оба spark.driver.extraClassPath а также spark.executor.extraClassPath указание на два jar-файла (hadoop-aws и aws-java-sdk). Если вы работаете в кластере, убедитесь, что ваши исполнители имеют доступ к файлам jar в кластере.

Мы используем spark 1.6.1 с Mesos, и у нас было много проблем с записью в S3 от spark. Я отдаю должное cfeduke за ответ. Небольшое изменение я сделал, добавив maven координаты в конфигурацию spark.jar в файле spark-defaults.conf. Я попытался с hadoop-aws:2.7.2, но все еще получал много ошибок, поэтому мы вернулись к 2.7.1. Ниже приведены изменения в spark-defaults.conf, которые работают для нас:

spark.jars.packages             net.java.dev.jets3t:jets3t:0.9.0,com.google.guava:guava:16.0.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1
spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key  <MY ACCESS KEY>
spark.hadoop.fs.s3a.secret.key  <MY SECRET KEY>
spark.hadoop.fs.s3a.fast.upload true

Спасибо, cfeduke, что нашли время написать свой пост. Это было очень полезно.

Вот подробности от октября 2016 года, представленные на Spark Summit EU: Apache Spark и Object Store.

Ключевые моменты

  • Коммиттер прямого вывода ушел из Spark 2.0 из-за риска / опыта повреждения данных.
  • В FileOutputCommitter есть некоторые настройки для уменьшения переименований, но не для их устранения.
  • Я работаю с некоторыми коллегами над коммиттером O(1), полагаясь на Apache Dynamo, чтобы обеспечить нам необходимую согласованность.
  • Чтобы использовать S3a, получите правильный путь к классам.
  • И быть на Hadoop 2.7.z; 2.6.x были некоторые проблемы, которые были решены к тому времени HADOOP-11571.
  • Под SPARK-7481 есть пиар, чтобы вывести все в искрометный дистрибутив, который вы создаете сами. В противном случае, спросите, кто бы ни поставлял в двоичные файлы, чтобы сделать работу.
  • Hadoop 2.8 собирается добавить значительные улучшения в HADOOP-11694.

Размещение продукта: сторона чтения-производительности HADOOP-11694 включена в HDP2.5; Там может представлять интерес документация Spark и S3, особенно варианты настройки.

Используя Spark 1.4.1, предварительно собранный с Hadoop 2.6, я могу заставить s3a:// работать при развертывании в автономный кластер Spark, добавив jar-файлы hadoop-aws и aws-java-sdk из Hadoop 2.7.1 дистрибутив (находится в $HADOOP_HOME/share/hadoop/tools/lib Hadoop 2.7.1) в моей переменной среды SPARK_CLASSPATH в моем файле $SPARK_HOME/conf/spark-env.sh.

Как вы сказали, hadoop 2.6 не поддерживает s3a, а последняя версия spark 1.6 1.6 не поддерживает hadoop 2.7, но у spark 2.0 нет проблем с hadoop 2.7 и s3a.

для spark 1.6.x мы сделали несколько грязных взломов с драйвером s3 от EMR... вы можете посмотреть этот документ: https://github.com/zalando/spark-appliance

если вы все еще хотите попробовать использовать s3a в spark 1.6.x, обратитесь к ответу здесь: /questions/13673761/pyspark-ispolzuet-roli-iam-dlya-dostupa-k-s3/13673776#13673776

Вы также можете добавить зависимости S3A в путь к классам, используя spark-defaults.conf,

Пример:

spark.driver.extraClassPath     /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.executor.extraClassPath   /usr/local/spark/jars/hadoop-aws-2.7.5.jar
spark.driver.extraClassPath     /usr/local/spark/jars/aws-java-sdk-1.7.4.jar
spark.executor.extraClassPath   /usr/local/spark/jars/aws-java-sdk-1.7.4.jar

Или просто:

spark.jars     /usr/local/spark/jars/hadoop-aws-2.7.5.jar,/usr/local/spark/jars/aws-java-sdk-1.7.4.jar

Просто убедитесь, что ваша версия AWS SDK соответствует версии Hadoop. Для получения дополнительной информации об этом, посмотрите на этот ответ: Невозможно получить доступ к данным S3, используя Spark 2.2

Вот решение для pyspark (возможно, с прокси):

def _configure_s3_protocol(spark, proxy=props["proxy"]["host"], port=props["proxy"]["port"], endpoint=props["s3endpoint"]["irland"]):
    """
    Configure access to the protocol s3
    https://sparkour.urizone.net/recipes/using-s3/
    AWS Regions and Endpoints
    https://docs.aws.amazon.com/general/latest/gr/rande.html
    """
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",  os.environ.get("AWS_ACCESS_KEY_ID"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY"))
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.host", proxy)
    sc._jsc.hadoopConfiguration().set("fs.s3a.proxy.port", port)
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", endpoint)
    return spark

Вы можете использовать s3n:// и изменить коммиттер вывода spark на тот, который напрямую выводит файл (см. Этот фрагмент)

Вот версия scala , которая отлично работает со Spark 3.2.1 (предварительно собранной) с Hadoop 3.3.1 , получая доступ к корзине S3 с машины, отличной от AWS [обычно локальная установка на машине разработчика]

сбт

       libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-streaming" % "3.2.1" % "provided",
    "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided",
    "org.apache.hadoop" % "hadoop-aws" % "3.3.1",
    "org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided"
  )

искровая программа

        val spark = SparkSession
    .builder()
    .master("local")
    .appName("Process parquet file")
    .config("spark.hadoop.fs.s3a.path.style.access", true)
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
    .config("spark.hadoop.fs.s3a.endpoint", ENDPOINT)
    .config(
      "spark.hadoop.fs.s3a.impl",
      "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    // The enable V4 does not seem necessary for the eu-west-3 region
    // see @stevel comment below 
    // .config("com.amazonaws.services.s3.enableV4", true)
    // .config(
    //  "spark.driver.extraJavaOptions",
    //  "-Dcom.amazonaws.services.s3.enableV4=true"
    // )
    .config("spark.executor.instances", "4")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val df = spark.read.parquet("s3a://[BUCKET NAME]/.../???.parquet")
  df.show()

примечание : регион находится в форме s3.[REGION].amazonaws.comнапример s3.eu-west-3.amazonaws.com

конфигурация s3

Чтобы сделать корзину доступной из-за пределов AWS, добавьте политику корзины в следующем виде:

      {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::[ACCOUNT ID]:user/[IAM USERNAME]"
            },
            "Action": [
                "s3:Delete*",
                "s3:Get*",
                "s3:List*",
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::[BUCKET NAME]/*"
        }
    ]
}

Предоставленные ACCESS_KEY и SECRET_KEY для конфигурации spark должны быть такими же, как у пользователя IAM, настроенного для корзины.

Я использую версию 2.3, и когда я сохраняю набор данных с помощью spark, например:

dataset.write().format("hive").option("fileFormat", "orc").mode(SaveMode.Overwrite)
    .option("path", "s3://reporting/default/temp/job_application")
    .saveAsTable("job_application");

Он отлично работает и сохраняет мои данные в s3.

Другие вопросы по тегам