Как зарегистрировать разные драйверы в коде из jar-файлов, переданных в команде spark-submit?

Я пытаюсь прочитать таблицу в базе данных Greenplum и для этого я написал следующий код: У меня есть файл настроек, позволяющий прочитать подробности соединения: testconnection.properties contents:

devUserName=gpUserName
devPassword=password
gpDriverClass=com.pivotal.jdbc.GreenplumDriver
gpDevUrl=jdbc:pivotal:greenplum://xx.xx.xxx.xxx:xxx;databaseName=abcd;ValidateServerCertificate=false;EncryptionMethod=requestSSL;MaxNumericScale=30;MaxNumericPrecision=40;
hiveMetaDriverClass=org.postgresql.Driver
hiveMetaDevUrl=jdbc:postgresql://hostname:portnumber/metastore?currentSchema=metadb
hiveMetaDevUser=metauser
hiveMetaDevpassword=metapassword

Код:

object YearPartition {
  val conf = new SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", "1200s")
    .set("spark.network.timeout", "12000s")
    .set("spark.sql.inMemoryColumnarStorage.compressed", "true")
    .set("spark.shuffle.compress", "true")
    .set("spark.sql.orc.filterPushdown", "true")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryoserializer.buffer.max", "512m")
    .set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName)
    .set("spark.streaming.stopGracefullyOnShutdown", "true")
  val conFile = "testconnection.properties"
  val properties = new Properties()
  properties.load(new FileInputStream(conFile))

  // GP Properties
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  // Hive Metastore properties
  val metaDriver      = properties.getProperty("hiveMetaDriverClass")
  val hiveMetaConURL  = properties.getProperty("hiveMetaDevUrl")
  val metaUserName    = properties.getProperty("hiveMetaDevUser")
  val metaPassword    = properties.getProperty("hiveMetaDevpassword")

  def main(args: Array[String]): Unit = {
    val flagCol   = "del_flag"
    var precisionColsText = new ListBuffer[String]
    val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
    import spark.implicits._
    val getColDetails = new ColumnDetails(spark, metaDriver, hiveMetaConURL, metaUserName, metaPassword)
    try {
      Class.forName(driverClass).newInstance()
    }
    catch {
      case cnf: ClassNotFoundException =>
        println("No class def found...")
        System.exit(1)
      case e: Exception =>
        println("No class def found...")
        System.exit(1)
    }
    val splitColumns      = getColDetails.returnSplitColumns()
    val textList          = getColDetails.returnTextList()
    val allColumns        = getColDetails.returnAllColumns()
    val dataMapper        = getColDetails.returnDataMapper()
    val partition_columns = getColDetails.returnPartitionColumns()
    val pCols             = precisionColsText.mkString(",")

    def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], allColumns: String, dataMapper: Map[String, String], partition_columns: Array[String], spark: SparkSession): DataFrame = {
      val colList = allColumns.split(",").toList
      val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
      val gpColSeq = allColumns.split(",").toSeq
      val yearDF = spark.read.format("greenplum").option("url", connectionUrl)
        .option("server.port","1234")
        .option("dbtable", "gptable")
        .option("dbschema","gpdb")
        .option("user", devUserName)
        .option("password", devPassword)
        .option("partitionColumn","id")
        .option("partitions",450)
        .load()
        .where("period_year=2017 and period_num=12")
        .select(gpColSeq map col:_*)
        .withColumn(flagCol, lit(0))
      val totalCols: List[String] = splitColumns ++ textList
      val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
      val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
      val resultDF = yearDF.select(allCols: _*)
      val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
      val finalDF = stringColumns.foldLeft(resultDF) {
        (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+", " "))
      }
      finalDF
    }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    dataDF.write.format("csv").save("hdfs://devuser/apps/hive/warehouse/headersTable/header_test_data2.csv")
  }
}

Чтобы получить Seq: gpColSeq, я написал другой класс: ColumnDetails для запроса таблицы метаданных улья, чтобы получить все необходимые метаданные, такие как имя столбца, тип столбца и т. Д.

class ColumnDetails(spark: SparkSession, metaDriver:String, hiveMetaConURL:String, metaUserName: String, metaPassword:String) {
  var precisionColsText = new ListBuffer[String]
  var textType          = "character varying"
  var textList          = new ListBuffer[String]
  try {
    Class.forName(metaDriver).newInstance()
  }
  catch {
    case cnf: ClassNotFoundException =>
      println("No class def found.")
      System.exit(1)
    case e: Exception =>
      println("No class def found.")
      System.exit(1)
  }
  import spark.implicits._
  val ddatatypes   = spark.read.format("jdbc").option("url", hiveMetaConURL)
    .option("dbtable", "(select source_type, hive_type from metadb.datatypes) as gpHiveDatadatatypes")
    .option("user", metaUserName).option("password", metaPassword).load()
  val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL)
    .option("dbtable", "(select source_columns, precision_columns, partition_columns from metadb.metadataTable where tablename='finance.gptable') as colsPrecision")
    .option("user", metaUserName).option("password", metaPassword).load()
  val dataMapper          = ddatatypes.as[(String, String)].collect().toMap
  val gpCols              = spColsDF.select("source_columns").map(row => row.getString(0)).collect.mkString(",")
  val gpColumns           = gpCols.split("\\|").map(e => e.split("\\:")).map(s => s(0)).mkString(",")
  val splitColumns        = gpCols.split("\\|").toList
  val precisionCols       = spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
  val partition_columns   = spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(","))
  val prtn_String_columns = spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
  val partCList           = prtn_String_columns.split(",").map(x => col(x))
  var splitPrecisionCols  = precisionCols.split(",")
  val pCols               = precisionColsText.mkString(",")
  val allColumns          = gpColumns.concat("," + pCols)
  def returnSplitColumns(): List[String] = {
    gpCols.split("\\|").toList
  }
  def returnTextList(): ListBuffer[String] =  {
    for (i <- splitPrecisionCols) {
      precisionColsText += i.concat(s"::${textType} as ").concat(s"${i}_text")
      textList += s"${i}_text:${textType}"
    }
    textList
  }
  def returnAllColumns(): String =  {
    gpColumns.concat("," + pCols)
  }
  def returnDataMapper(): Map[String, String] = {
    ddatatypes.as[(String, String)].collect().toMap
  }
  def returnPartitionColumns(): Array[String] = {
    spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(","))
  }
}

Чтобы запросить таблицу метаданных, я дал драйвер как: org.postgresql.Driver в классе: ColumnDetails

Я отправляю jar, используя нижеприведенную команду spark-submit, где я передаю как greenplum.jar, так и postgres jar:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar --conf spark.jars=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar --executor-cores 3 --executor-memory 13G --keytab /home/hdpuser/hdpuser.keytab --principal hdpuser@devuser.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar splinter_2.11-0.1.jar

Когда баночка отправлена, она заканчивается исключением:

"Exception in thread "main" java.sql.SQLException: No suitable driver" at the line:
  val dtypes   = spark.read.format("jdbc").option("url", hiveMetaConURL)
    .option("dbtable", "(select source_type, hive_type from metadb.types) as gpHiveDataTypes")
    .option("user", metaUserName).option("password", metaPassword).load()

Может ли кто-нибудь дать мне знать, как я могу передать разные jar-файлы в spark-submit для запроса разных баз данных. В этом случае, как я могу зарегистрировать два драйвера в двух разных классах из jar-файлов, переданных в spark-submit:

  1. Передайте флягу, чтобы получить метаданные из моей таблицы метаданных
  2. Добавьте еще одну банку для чтения данных из GP.

2 ответа

Флаг --jars разделен запятыми, но все пути классов используют двоеточия. Попробуй это:

SPARK_MAJOR_VERSION=2 spark-submit \
    --class com.partition.source.YearPartition \
    --master=yarn \
    --conf spark.ui.port=4090 \
    --driver-class-path /home/hdpuser/jars/greenplum.jar:/home/hdpuser/jars/postgresql-42.1.4.jar \
    --conf spark.jars=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar \
    --executor-cores 3 \
    --executor-memory 13G \
    --keytab /home/hdpuser/hdpuser.keytab \
    --principal hdpuser@devuser.COM \
    --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties \
    --name Splinter \
    --conf spark.executor.extraClassPath=/home/hdpuser/jars/greenplum.jar:/home/hdpuser/jars/postgresql-42.1.4.jar \
    splinter_2.11-0.1.jar

Вы пробовали --jars включить все зависимые файлы JAR вместо того, что вы делаете сейчас?

Другие вопросы по тегам