Как передать hiveContext в качестве аргумента функции spark scala
Я создал hiveContext
в main()
функция в Scala и мне нужно пройти через параметры этого hiveContext
для других функций это структура:
object Project {
def main(name: String): Int = {
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
...
}
def read (streamId: Int, hc:hiveContext): Array[Byte] = {
...
}
def close (): Unit = {
...
}
}
но это не работает функция read()
называется внутри main()
,
любая идея?
2 ответа
Я объявляю hiveContext неявным, это работает для меня
implicit val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)
Определено в MyJob:
override def run(config: Config)(implicit sqlContext: SQLContext): Unit = ...
Но если вы не хотите, чтобы это неявно, это должно быть то же самое
val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)(sqlContext)
override def run(config: Config)(sqlContext: SQLContext): Unit = ...
Кроме того, ваша функция read должна получить HiveContext в качестве типа для параметра hc, а не hiveContext
def read (streamId: Int, hc:HiveContext): Array[Byte] =
Я попробовал несколько вариантов, это то, что в конце концов у меня получилось..
object SomeName extends App {
val conf = new SparkConf()...
val sc = new SparkContext(conf)
implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)
def getDF1(sqlCo: SQLContext): Unit = {
val query1 = SomeQuery here
val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()
//iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame
df1.foreach(x => {
getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
})
}
def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) : Unit = {
val query2 = Somequery
val sqlcc = SQLContext.getOrCreate(sc)
//val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
.
.
.
}
}
Примечание. В приведенном выше коде, если я опущу (неявный sqlCont: SQLContext) параметр в сигнатуре метода getDF2, он не будет работать. Я пробовал несколько других вариантов передачи sqlContext от одного метода к другому, он всегда давал мне NullPointerException или Task, не сериализуемое Excpetion. Хорошо, что в конечном итоге это сработало таким образом, и я мог извлечь параметры из строки DataFrame1 и использовать эти значения при загрузке DataFrame 2.