Как сохранить фрейм данных Kafka-Spark Streaming в один файл
Как сохранить фрейм данных Kafka-Spark Streaming в один файл
Я разработал приложение, которое будет принимать сообщения, используя процесс Kafka-Spark Streaming.
Как только данные получены, они преобразуются в фрейм данных.
Затем фрейм потоковых данных сохраняется в виде текстового файла, здесь фрейм данных сохраняется в каждом файле для каждого сообщения потока kafka, ниже приведен код, который я использовал для сохранения в качестве фрейма данных в текстовом файле, это сохраняет данные в многократный текст файл для каждого сообщения.
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append")
.save("path")
Здесь требование, которое я хотел бы выполнить, - это сохранение даты в потоковом формате в виде отдельного файла для каждого сообщения kafka, если возможно, пожалуйста, помогите мне с решением.
заранее спасибо
1 ответ
Ниже код может помочь вам. Просто создайте список RDD и затем объедините его.
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinRDD = ssc.sparkContext.union(dStreamRDDList)
//then convert joinRDD to DataFrame (DF)
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append")
.save("path")