Scala: запись в файл внутри foreachRDD
Я использую потоковую передачу Spark для обработки данных, поступающих с Kafka. И я хотел бы записать результат в файл (на локальном). Когда я печатаю на консоли, все работает нормально, и я получаю свои результаты, но когда я пытаюсь записать это в файл, я получаю ошибку.
я использую PrintWriter
чтобы сделать это, но я получаю эту ошибку:
Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
java.io.PrintWriter
Serialization stack:
- object not serializable (class: java.io.PrintWriter, value: java.io.PrintWriter@20f6f88c)
- field (class: streaming.followProduction$$anonfun$main$1, name: qualityWriter$1, type: class java.io.PrintWriter)
- object (class streaming.followProduction$$anonfun$main$1, <function1>)
- field (class: streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class streaming.followProduction$$anonfun$main$1)
- object (class streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData,
Я думаю, что я не могу использовать писателя в ForeachRDD!
Вот мой код:
object followProduction extends Serializable {
def main(args: Array[String]) = {
val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.append("dateTime , quality , status \n")
val sparkConf = new SparkConf().setMaster("spark://address:7077").setAppName("followProcess").set("spark.streaming.concurrentJobs", "4")
val sc = new StreamingContext(sparkConf, Seconds(10))
sc.checkpoint("checkpoint")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "address:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("A", "C")
topics.foreach(t => {
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](Array(t), kafkaParams)
)
stream.foreachRDD(rdd => {
rdd.collect().foreach(i => {
val record = i.value()
val newCsvRecord = process(t, record)
println(newCsvRecord)
qualityWriter.append(newCsvRecord)
})
})
})
qualityWriter.close()
sc.start()
sc.awaitTermination()
}
var componentQuantity: componentQuantity = new componentQuantity("", 0.0, 0.0, 0.0)
var diskQuality: diskQuality = new diskQuality("", 0.0)
def process(topic: String, record: String): String = topic match {
case "A" => componentQuantity.checkQuantity(record)
case "C" => diskQuality.followQuality(record)
}
}
У меня есть этот класс, я звоню:
case class diskQuality(datetime: String, quality: Double) extends Serializable {
def followQuality(record: String): String = {
val dateFormat: SimpleDateFormat = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss")
var recQuality = msgParse(record).quality
var date: Date = dateFormat.parse(msgParse(record).datetime)
var recDateTime = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss").format(date)
// some operations here
return recDateTime + " , " + recQuality
}
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
}
Как мне этого добиться? Я новичок и в Spark, и в Scala, так что, возможно, я не все делаю правильно. Спасибо за ваше время
РЕДАКТИРОВАТЬ:
Я изменил свой код и больше не получаю эту ошибку. Но в то же время у меня есть только первая строка в моем файле, и записи не добавляются. Писатель (handleWriter) внутри фактически не работает.
Вот мой код:
stream.foreachRDD(rdd => {
val qualityWriter = new PrintWriter(file)
qualityWriter.write("dateTime , quality , status \n")
qualityWriter.close()
rdd.collect().foreach(i =>
{
val record = i.value()
val newCsvRecord = process(topic , record)
val handleWriter = new PrintWriter(file)
handleWriter.append(newCsvRecord)
handleWriter.close()
println(newCsvRecord)
})
})
Где я скучал? Может быть, я делаю это неправильно...
2 ответа
PrintWriter
является локальным ресурсом, связанным с одной машиной и не может быть сериализован.
Чтобы удалить этот объект из плана сериализации Java, мы можем объявить его @transient
, Это означает, что форма сериализации followProduction
объект не будет пытаться сериализовать это поле.
В коде вопроса это должно быть объявлено как:
@transient val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
Тогда становится возможным использовать его в рамках foreachRDD
закрытие.
Но этот процесс не решает проблемы, связанные с правильной обработкой файла. qualityWriter.close()
будет выполнен при первом проходе потокового задания, а файловый дескриптор будет закрыт для записи во время выполнения задания. Чтобы правильно использовать местные ресурсы, такие как File
Я бы последовал предложению Ювала воссоздать PrintWriter в foreachRDD
закрытие. Недостающий кусок объявляет новый PrintWritter
в режиме добавления. Измененный код в пределах foreachRDD
будет выглядеть так (внесение дополнительных изменений в код):
// Initialization phase
val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.println("dateTime , quality , status")
qualityWriter.close()
....
dstream.foreachRDD{ rdd =>
val data = rdd.map(e => e.value())
.collect() // get the data locally
.map(i=> process(topic , i)) // create csv records
val allRecords = data.mkString("\n") // why do I/O if we can do in-mem?
val handleWriter = new PrintWriter(file, append=true)
handleWriter.append(allRecords)
handleWriter.close()
}
Несколько замечаний о коде в вопросе:
"spark.streaming.concurrentJobs", "4"
Это создаст проблему с записью нескольких потоков в один и тот же локальный файл. Это, вероятно, также неправильно используется в этом контексте.
sc.checkpoint ("контрольная точка")
Кажется, нет необходимости в контрольных точках на этой работе.
Самое простое, что можно сделать, это создать экземпляр PrintWriter
внутри foreachRDD
, это означает, что оно не будет захвачено закрытием функции:
stream.foreachRDD(rdd => {
val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.append("dateTime , quality , status \n")
rdd.collect().foreach(i => {
val record = i.value()
val newCsvRecord = process(t, record)
qualityWriter.append(newCsvRecord)
})
})
})