Как передать hiveContext в качестве аргумента функции spark scala

Я создал hiveContext в main() функция в Scala и мне нужно пройти через параметры этого hiveContext для других функций это структура: object Project {def main(name: String): Int = {val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)...}def read (streamId: Int, hc:hiveContext): Array[Byte] = {...}def close (): Unit = {...}} но это не работает функция read() называется внутри main(),

любая идея?

2 ответа

Я объявляю hiveContext неявным, это работает для меня

implicit val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)

Определено в MyJob:

override def run(config: Config)(implicit sqlContext: SQLContext): Unit = ...

Но если вы не хотите, чтобы это неявно, это должно быть то же самое

val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)(sqlContext)

override def run(config: Config)(sqlContext: SQLContext): Unit = ...

Кроме того, ваша функция read должна получить HiveContext в качестве типа для параметра hc, а не hiveContext

def read (streamId: Int, hc:HiveContext): Array[Byte] = 

Я попробовал несколько вариантов, это то, что в конце концов у меня получилось..

object SomeName extends App {

val conf = new SparkConf()...
val sc = new SparkContext(conf)

implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)

def getDF1(sqlCo: SQLContext): Unit = {
    val query1 =  SomeQuery here  
    val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()

 //iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame

  df1.foreach(x => {
    getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
  })     
}

def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) :  Unit = {
  val query2 = Somequery

  val sqlcc = SQLContext.getOrCreate(sc)
  //val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
  val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
   .
   .
   .
 }
}

Примечание. В приведенном выше коде, если я опущу (неявный sqlCont: SQLContext) параметр в сигнатуре метода getDF2, он не будет работать. Я пробовал несколько других вариантов передачи sqlContext от одного метода к другому, он всегда давал мне NullPointerException или Task, не сериализуемое Excpetion. Хорошо, что в конечном итоге это сработало таким образом, и я мог извлечь параметры из строки DataFrame1 и использовать эти значения при загрузке DataFrame 2.

Другие вопросы по тегам