Создание Dataframe внутри искровой структурированной потоковой передачи для forEachWriter для вставки в проблему с таблицей куду
У меня есть проблема, для которой я пытался найти решение, но не смог ничего найти и хотел бы получить любые * указатели, которые я могу получить.
Поэтому я пытаюсь интегрировать структурированную потоковую передачу Spark с Apache Kudu, я читаю поток из Kafka и выполняю некоторую обработку и теперь должен записывать в таблицы Kudu, проблема в том, что искровая структурированная потоковая передача не обеспечивает поддержку приемника Kudu (что Я знаю?), И я использую foreach writer, но как только пытаюсь создать фрейм данных внутри ForeachWriter.process(), он просто зависает и никогда не движется дальше
import org.apache.spark.sql.ForeachWriter
val foreachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long,version: Long): Boolean = {
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("value", DoubleType),
StructField("EventTimestamp", TimestampType)
))
true
}
override def process(value: Row): Unit = {
println("values\n------------------")
val spark = SparkSession.builder.appName("Spark-Kafka-Integrations").master("local").getOrCreate()
val valRDD=spark.sparkContext.parallelize(value.toSeq)
val valRDF=valRDD.map(x=>x.toString.split(",").to[List])
println(value)
val valDF=spark.createDataFrame(valRDF)
valDF.show()
println("End values\n///////////////////")
//shoud insert into kudu here
}
override def close(errorOrNull: Throwable): Unit = {
}
}
//count is a Dstream/streaming dataframe
count.writeStream.foreach(foreachWriter).outputMode("complete") .option("truncate", "false").start().awaitTermination()