Строковый фильтр с использованием Spark UDF
input.csv:
200,300,889,767,9908,7768,9090
300,400,223,4456,3214,6675,333
234567890
123445667887
Что я хочу: читать входной файл и сравнивать с набором "123 200 300", если совпадение найдено, дает соответствующие данные 200 300 (из 1 строки ввода)
300 (из 2 строк ввода)
123 (из 4 строк ввода)
То, что я написал:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object sparkApp {
val conf = new SparkConf()
.setMaster("local")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
def parseLine(invCol: String) : RDD[String] = {
println(s"INPUT, $invCol")
val inv_rdd = sc.parallelize(Seq(invCol.toString))
val bs_meta_rdd = sc.parallelize(Seq("123,200,300"))
return inv_rdd.intersection(bs_meta_rdd)
}
def main(args: Array[String]) {
val filePathName = "hdfs://xxx/tmp/input.csv"
val rawData = sc.textFile(filePathName)
val datad = rawData.map{r => parseLine(r)}
}
}
Я получаю следующее исключение:
java.lang.NullPointerException
Подскажите пожалуйста где я ошибся
3 ответа
Для простого ответа, почему мы делаем это настолько сложным? В этом случае нам не требуется UDF.
Это ваши входные данные:
200,300,889,767,9908,7768,9090|AAA
300,400,223,4456,3214,6675,333|BBB
234,567,890|CCC
123,445,667,887|DDD
и вы должны сопоставить его с 123,200,300
val matchSet = "123,200,300".split(",").toSet
val rawrdd = sc.textFile("D:\\input.txt")
rawrdd.map(_.split("|"))
.map(arr => arr(0).split(",").toSet.intersect(matchSet).mkString(",") + "|" + arr(1))
.foreach(println)
Ваш вывод:
300,200|AAA
300|BBB
|CCC
123|DDD
Проблема решена. Это очень просто.
val pfile = sc.textFile("/FileStore/tables/6mjxi2uz1492576337920/input.csv")
case class pSchema(id: Int, pName: String)
val pDF = pfile.map(_.split("\t")).map(p => pSchema(p(0).toInt,p(1).trim())).toDF()
pDF.select("id","pName").show()
Определить UDF
val findP = udf((id: Int,
pName: String
) => {
val ids = Array("123","200","300")
var idsFound : String = ""
for (id <- ids){
if (pName.contains(id)){
idsFound = idsFound + id + ","
}
}
if (idsFound.length() > 0) {
idsFound = idsFound.substring(0,idsFound.length -1)
}
idsFound
})
Используйте UDF в withCoulmn()
pDF.select("id","pName").withColumn("Found",findP($"id",$"pName")).show()
То, что вы пытаетесь сделать, не может быть сделано так, как вы это делаете.
Spark не поддерживает вложенные RDD (см. SPARK-5063).
Spark не поддерживает вложенные RDD или действия Spark внутри преобразований; это обычно приводит к NullPointerException (см. SPARK-718 в качестве одного примера). Запутанный NPE - один из самых распространенных источников вопросов Spark о Stackru:
вызов отличного и карта вместе бросает NPE в искровой библиотеке
NullPointerException в Scala Spark, по-видимому, вызвано типом коллекции?
Graphx: у меня есть исключение NullPointerException внутри mapVertices (это всего лишь пример тех, на которые я лично ответил; есть много других).
Я думаю, что мы можем обнаружить эти ошибки, добавив логику в RDD, чтобы проверить, является ли sc нулевым (например, превратить sc в функцию получения); мы можем использовать это, чтобы добавить лучшее сообщение об ошибке.