Как подключиться к серверу удаленного улья из спарка

Я использую spark локально и хочу получить доступ к таблицам Hive, которые расположены в удаленном кластере Hadoop.

Я могу получить доступ к таблицам улья, запустив beeline под SPARK_HOME

[ml@master spark-2.0.0]$./bin/beeline 
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://remote_hive:10000
Connecting to jdbc:hive2://remote_hive:10000
Enter username for jdbc:hive2://remote_hive:10000: root
Enter password for jdbc:hive2://remote_hive:10000: ******
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/ml/spark/spark-2.0.0/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/10/12 19:06:39 INFO jdbc.Utils: Supplied authorities: remote_hive:10000
16/10/12 19:06:39 INFO jdbc.Utils: Resolved authority: remote_hive:10000
16/10/12 19:06:39 INFO jdbc.HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://remote_hive:10000
Connected to: Apache Hive (version 1.2.1000.2.4.2.0-258)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://remote_hive:10000>

как я могу получить доступ к таблицам удаленного куста программно из спарк?

1 ответ

Решение

JDBC не требуется

Spark подключается напрямую к метасольве Hive, а не через HiveServer2. Чтобы настроить это,

  1. Положил hive-site.xml на ваше classpathи укажите hive.metastore.uriс того места, где находился ваш улей. Также см. Как подключиться к метасольве Hive программно в SparkSQL?

  2. Импортировать org.apache.spark.sql.hive.HiveContext, так как он может выполнять SQL-запрос по таблицам Hive.

  3. определять val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  4. проверить sqlContext.sql("show tables") чтобы увидеть, работает ли это

SparkSQL на таблицах Hive

Вывод: если вы должны пойти с JDBC путь

Посмотрите, как удаленно соединяется Apache Spark с Apache Hive.

Обратите внимание, что Билайн также подключается через jdbc. из вашего журнала это само по себе очевидно.

[ml @ master spark-2.0.0] $. / bin / beeline Beeline версия 1.2.1.spark2 от Apache Hive beeline>!connect jdbc:hive2://remote_hive:10000

Подключение к jdbc: hive2: // remote_hive: 10000

Поэтому, пожалуйста, взгляните на эту интересную статью.

  • Способ 1: вытянуть таблицу в Spark с помощью JDBC
  • Способ 2: использование Spark JdbcRDD с драйвером Jived HiveServer2
  • Способ 3: выборка набора данных на стороне клиента, а затем создание RDD вручную

В настоящее время драйвер HiveServer2 не позволяет использовать "сверкающий" метод 1 и 2, мы можем полагаться только на метод 3

Ниже приведен пример фрагмента кода, который может быть достигнут

Загрузка данных из одного кластера Hadoop (он же "удаленный") в другой (где мой Spark живет как "домашний") через JDBC-соединение HiveServer2.

import java.sql.Timestamp
import scala.collection.mutable.MutableList

case class StatsRec (
  first_name: String,
  last_name: String,
  action_dtm: Timestamp,
  size: Long,
  size_p: Long,
  size_d: Long
)

val conn: Connection = DriverManager.getConnection(url, user, password)
val res: ResultSet = conn.createStatement
                   .executeQuery("SELECT * FROM stats_201512301914")
val fetchedRes = MutableList[StatsRec]()
while(res.next()) {
  var rec = StatsRec(res.getString("first_name"), 
     res.getString("last_name"), 
     Timestamp.valueOf(res.getString("action_dtm")), 
     res.getLong("size"), 
     res.getLong("size_p"), 
     res.getLong("size_d"))
  fetchedRes += rec
}
conn.close()
val rddStatsDelta = sc.parallelize(fetchedRes)
rddStatsDelta.cache()




 // Basically we are done. To check loaded data:

println(rddStatsDelta.count)
rddStatsDelta.collect.take(10).foreach(println)

После предоставления конфигурации hive-ste.xml для SPARK и после запуска службы HIVE Metastore,

В сеансе SPARK при подключении к HIVE необходимо настроить две вещи:

  1. Поскольку Spark SQL подключается к хранилищу метаданных Hive с использованием экономичности, нам необходимо предоставить URI экономичного сервера при создании сеанса Spark.
  2. Хранилище Hive Metastore - это каталог, в котором Spark SQL сохраняет таблицы. Используйте свойство 'spark.sql.warehouse.dir', которое соответствует 'hive.metastore.warehouse.dir' (поскольку это устарело в Spark 2.0)

Что-то типа:

    SparkSession spark=SparkSession.builder().appName("Spark_SQL_5_Save To Hive").enableHiveSupport().getOrCreate();
    spark.sparkContext().conf().set("spark.sql.warehouse.dir", "/user/hive/warehouse");
    spark.sparkContext().conf().set("hive.metastore.uris", "thrift://localhost:9083");

Надеюсь, это было полезно!!

Согласно документации:

Обратите внимание, что свойство hive.metastore.warehouse.dir в hive-site.xml устарело, начиная с Spark 2.0.0. Вместо этого используйте spark.sql.warehouse.dir, чтобы указать расположение базы данных на складе по умолчанию.

Так что в SparkSession вам нужно указать spark.sql.uris вместо того hive.metastore.uris

    from pyspark.sql import SparkSession
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.uris", "thrift://<remote_ip>:9083") \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sql("show tables").show()
Другие вопросы по тегам