Как зарегистрировать разные драйверы в коде из jar-файлов, переданных в команде spark-submit?
Я пытаюсь прочитать таблицу в базе данных Greenplum и для этого я написал следующий код: У меня есть файл настроек, позволяющий прочитать подробности соединения: testconnection.properties contents:
devUserName=gpUserName
devPassword=password
gpDriverClass=com.pivotal.jdbc.GreenplumDriver
gpDevUrl=jdbc:pivotal:greenplum://xx.xx.xxx.xxx:xxx;databaseName=abcd;ValidateServerCertificate=false;EncryptionMethod=requestSSL;MaxNumericScale=30;MaxNumericPrecision=40;
hiveMetaDriverClass=org.postgresql.Driver
hiveMetaDevUrl=jdbc:postgresql://hostname:portnumber/metastore?currentSchema=metadb
hiveMetaDevUser=metauser
hiveMetaDevpassword=metapassword
Код:
object YearPartition {
val conf = new SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", "1200s")
.set("spark.network.timeout", "12000s")
.set("spark.sql.inMemoryColumnarStorage.compressed", "true")
.set("spark.shuffle.compress", "true")
.set("spark.sql.orc.filterPushdown", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "512m")
.set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName)
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val conFile = "testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
// GP Properties
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
// Hive Metastore properties
val metaDriver = properties.getProperty("hiveMetaDriverClass")
val hiveMetaConURL = properties.getProperty("hiveMetaDevUrl")
val metaUserName = properties.getProperty("hiveMetaDevUser")
val metaPassword = properties.getProperty("hiveMetaDevpassword")
def main(args: Array[String]): Unit = {
val flagCol = "del_flag"
var precisionColsText = new ListBuffer[String]
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
val getColDetails = new ColumnDetails(spark, metaDriver, hiveMetaConURL, metaUserName, metaPassword)
try {
Class.forName(driverClass).newInstance()
}
catch {
case cnf: ClassNotFoundException =>
println("No class def found...")
System.exit(1)
case e: Exception =>
println("No class def found...")
System.exit(1)
}
val splitColumns = getColDetails.returnSplitColumns()
val textList = getColDetails.returnTextList()
val allColumns = getColDetails.returnAllColumns()
val dataMapper = getColDetails.returnDataMapper()
val partition_columns = getColDetails.returnPartitionColumns()
val pCols = precisionColsText.mkString(",")
def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], allColumns: String, dataMapper: Map[String, String], partition_columns: Array[String], spark: SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val gpColSeq = allColumns.split(",").toSeq
val yearDF = spark.read.format("greenplum").option("url", connectionUrl)
.option("server.port","1234")
.option("dbtable", "gptable")
.option("dbschema","gpdb")
.option("user", devUserName)
.option("password", devPassword)
.option("partitionColumn","id")
.option("partitions",450)
.load()
.where("period_year=2017 and period_num=12")
.select(gpColSeq map col:_*)
.withColumn(flagCol, lit(0))
val totalCols: List[String] = splitColumns ++ textList
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols: _*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+", " "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
dataDF.write.format("csv").save("hdfs://devuser/apps/hive/warehouse/headersTable/header_test_data2.csv")
}
}
Чтобы получить Seq: gpColSeq, я написал другой класс: ColumnDetails для запроса таблицы метаданных улья, чтобы получить все необходимые метаданные, такие как имя столбца, тип столбца и т. Д.
class ColumnDetails(spark: SparkSession, metaDriver:String, hiveMetaConURL:String, metaUserName: String, metaPassword:String) {
var precisionColsText = new ListBuffer[String]
var textType = "character varying"
var textList = new ListBuffer[String]
try {
Class.forName(metaDriver).newInstance()
}
catch {
case cnf: ClassNotFoundException =>
println("No class def found.")
System.exit(1)
case e: Exception =>
println("No class def found.")
System.exit(1)
}
import spark.implicits._
val ddatatypes = spark.read.format("jdbc").option("url", hiveMetaConURL)
.option("dbtable", "(select source_type, hive_type from metadb.datatypes) as gpHiveDatadatatypes")
.option("user", metaUserName).option("password", metaPassword).load()
val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL)
.option("dbtable", "(select source_columns, precision_columns, partition_columns from metadb.metadataTable where tablename='finance.gptable') as colsPrecision")
.option("user", metaUserName).option("password", metaPassword).load()
val dataMapper = ddatatypes.as[(String, String)].collect().toMap
val gpCols = spColsDF.select("source_columns").map(row => row.getString(0)).collect.mkString(",")
val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => s(0)).mkString(",")
val splitColumns = gpCols.split("\\|").toList
val precisionCols = spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
val partition_columns = spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(","))
val prtn_String_columns = spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
val partCList = prtn_String_columns.split(",").map(x => col(x))
var splitPrecisionCols = precisionCols.split(",")
val pCols = precisionColsText.mkString(",")
val allColumns = gpColumns.concat("," + pCols)
def returnSplitColumns(): List[String] = {
gpCols.split("\\|").toList
}
def returnTextList(): ListBuffer[String] = {
for (i <- splitPrecisionCols) {
precisionColsText += i.concat(s"::${textType} as ").concat(s"${i}_text")
textList += s"${i}_text:${textType}"
}
textList
}
def returnAllColumns(): String = {
gpColumns.concat("," + pCols)
}
def returnDataMapper(): Map[String, String] = {
ddatatypes.as[(String, String)].collect().toMap
}
def returnPartitionColumns(): Array[String] = {
spColsDF.select("partition_columns").collect.flatMap(x => x.getAs[String](0).split(","))
}
}
Чтобы запросить таблицу метаданных, я дал драйвер как: org.postgresql.Driver в классе: ColumnDetails
Я отправляю jar, используя нижеприведенную команду spark-submit, где я передаю как greenplum.jar, так и postgres jar:
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar --conf spark.jars=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar --executor-cores 3 --executor-memory 13G --keytab /home/hdpuser/hdpuser.keytab --principal hdpuser@devuser.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar splinter_2.11-0.1.jar
Когда баночка отправлена, она заканчивается исключением:
"Exception in thread "main" java.sql.SQLException: No suitable driver" at the line:
val dtypes = spark.read.format("jdbc").option("url", hiveMetaConURL)
.option("dbtable", "(select source_type, hive_type from metadb.types) as gpHiveDataTypes")
.option("user", metaUserName).option("password", metaPassword).load()
Может ли кто-нибудь дать мне знать, как я могу передать разные jar-файлы в spark-submit для запроса разных баз данных. В этом случае, как я могу зарегистрировать два драйвера в двух разных классах из jar-файлов, переданных в spark-submit:
- Передайте флягу, чтобы получить метаданные из моей таблицы метаданных
- Добавьте еще одну банку для чтения данных из GP.
2 ответа
Флаг --jars разделен запятыми, но все пути классов используют двоеточия. Попробуй это:
SPARK_MAJOR_VERSION=2 spark-submit \
--class com.partition.source.YearPartition \
--master=yarn \
--conf spark.ui.port=4090 \
--driver-class-path /home/hdpuser/jars/greenplum.jar:/home/hdpuser/jars/postgresql-42.1.4.jar \
--conf spark.jars=/home/hdpuser/jars/greenplum.jar,/home/hdpuser/jars/postgresql-42.1.4.jar \
--executor-cores 3 \
--executor-memory 13G \
--keytab /home/hdpuser/hdpuser.keytab \
--principal hdpuser@devuser.COM \
--files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties \
--name Splinter \
--conf spark.executor.extraClassPath=/home/hdpuser/jars/greenplum.jar:/home/hdpuser/jars/postgresql-42.1.4.jar \
splinter_2.11-0.1.jar
Вы пробовали --jars включить все зависимые файлы JAR вместо того, что вы делаете сейчас?