Как сохранить фрейм данных Kafka-Spark Streaming в один файл

Как сохранить фрейм данных Kafka-Spark Streaming в один файл

Я разработал приложение, которое будет принимать сообщения, используя процесс Kafka-Spark Streaming.

Как только данные получены, они преобразуются в фрейм данных.

Затем фрейм потоковых данных сохраняется в виде текстового файла, здесь фрейм данных сохраняется в каждом файле для каждого сообщения потока kafka, ниже приведен код, который я использовал для сохранения в качестве фрейма данных в текстовом файле, это сохраняет данные в многократный текст файл для каждого сообщения.

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append")
                              .save("path")

Здесь требование, которое я хотел бы выполнить, - это сохранение даты в потоковом формате в виде отдельного файла для каждого сообщения kafka, если возможно, пожалуйста, помогите мне с решением.

заранее спасибо

1 ответ

Ниже код может помочь вам. Просто создайте список RDD и затем объедините его.

var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
    {
        dStreamRDDList += rdd
    })
val joinRDD = ssc.sparkContext.union(dStreamRDDList)
//then convert joinRDD to DataFrame (DF)
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append")
                          .save("path")
Другие вопросы по тегам