Scala: запись в файл внутри foreachRDD

Я использую потоковую передачу Spark для обработки данных, поступающих с Kafka. И я хотел бы записать результат в файл (на локальном). Когда я печатаю на консоли, все работает нормально, и я получаю свои результаты, но когда я пытаюсь записать это в файл, я получаю ошибку.

я использую PrintWriter чтобы сделать это, но я получаю эту ошибку:

Exception in thread "main" java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
java.io.PrintWriter
Serialization stack:
    - object not serializable (class: java.io.PrintWriter, value: java.io.PrintWriter@20f6f88c)
    - field (class: streaming.followProduction$$anonfun$main$1, name: qualityWriter$1, type: class java.io.PrintWriter)
    - object (class streaming.followProduction$$anonfun$main$1, <function1>)
    - field (class: streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class streaming.followProduction$$anonfun$main$1)
    - object (class streaming.followProduction$$anonfun$main$1$$anonfun$apply$1, <function1>)
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
    - object (class org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData, 

Я думаю, что я не могу использовать писателя в ForeachRDD!

Вот мой код:

object followProduction extends Serializable {

  def main(args: Array[String]) = {

    val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
    qualityWriter.append("dateTime , quality , status \n")

    val sparkConf = new SparkConf().setMaster("spark://address:7077").setAppName("followProcess").set("spark.streaming.concurrentJobs", "4")
    val sc = new StreamingContext(sparkConf, Seconds(10))

    sc.checkpoint("checkpoint")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "address:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> s"${UUID.randomUUID().toString}",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("A", "C")

    topics.foreach(t => {

      val stream = KafkaUtils.createDirectStream[String, String](
        sc,
        PreferConsistent,
        Subscribe[String, String](Array(t), kafkaParams)
      )

      stream.foreachRDD(rdd => {

        rdd.collect().foreach(i => {

          val record = i.value()
          val newCsvRecord = process(t, record)

          println(newCsvRecord)

          qualityWriter.append(newCsvRecord)

        })
      })

    })

    qualityWriter.close()

    sc.start()
    sc.awaitTermination()

  }

  var componentQuantity: componentQuantity = new componentQuantity("", 0.0, 0.0, 0.0)
  var diskQuality: diskQuality = new diskQuality("", 0.0)

  def process(topic: String, record: String): String = topic match {
    case "A" => componentQuantity.checkQuantity(record)
    case "C" => diskQuality.followQuality(record)
  }
}

У меня есть этот класс, я звоню:

case class diskQuality(datetime: String, quality: Double) extends Serializable {

  def followQuality(record: String): String = {

    val dateFormat: SimpleDateFormat = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss")


    var recQuality = msgParse(record).quality
    var date: Date = dateFormat.parse(msgParse(record).datetime)
    var recDateTime = new SimpleDateFormat("dd-mm-yyyy hh:mm:ss").format(date)

    // some operations here

    return recDateTime + " , " + recQuality

  }

  def msgParse(value: String): diskQuality = {

    import org.json4s._
    import org.json4s.native.JsonMethods._

    implicit val formats = DefaultFormats

    val res = parse(value).extract[diskQuality]
    return res

  }
}

Как мне этого добиться? Я новичок и в Spark, и в Scala, так что, возможно, я не все делаю правильно. Спасибо за ваше время

РЕДАКТИРОВАТЬ:

Я изменил свой код и больше не получаю эту ошибку. Но в то же время у меня есть только первая строка в моем файле, и записи не добавляются. Писатель (handleWriter) внутри фактически не работает.

Вот мой код:

stream.foreachRDD(rdd => {

    val qualityWriter = new PrintWriter(file)
    qualityWriter.write("dateTime , quality , status \n")
    qualityWriter.close()

    rdd.collect().foreach(i =>
    {
      val record = i.value()

      val newCsvRecord = process(topic , record)

      val handleWriter = new PrintWriter(file)
      handleWriter.append(newCsvRecord)
      handleWriter.close()

      println(newCsvRecord)
    })
  })

Где я скучал? Может быть, я делаю это неправильно...

2 ответа

Решение

PrintWriter является локальным ресурсом, связанным с одной машиной и не может быть сериализован.

Чтобы удалить этот объект из плана сериализации Java, мы можем объявить его @transient, Это означает, что форма сериализации followProduction объект не будет пытаться сериализовать это поле.

В коде вопроса это должно быть объявлено как:

@transient val qualityWriter = new PrintWriter(new File("diskQuality.txt"))

Тогда становится возможным использовать его в рамках foreachRDD закрытие.

Но этот процесс не решает проблемы, связанные с правильной обработкой файла. qualityWriter.close() будет выполнен при первом проходе потокового задания, а файловый дескриптор будет закрыт для записи во время выполнения задания. Чтобы правильно использовать местные ресурсы, такие как FileЯ бы последовал предложению Ювала воссоздать PrintWriter в foreachRDD закрытие. Недостающий кусок объявляет новый PrintWritter в режиме добавления. Измененный код в пределах foreachRDD будет выглядеть так (внесение дополнительных изменений в код):

// Initialization phase

val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
qualityWriter.println("dateTime , quality , status")
qualityWriter.close()

....

dstream.foreachRDD{ rdd => 

  val data = rdd.map(e => e.value())
                .collect() // get the data locally
                .map(i=> process(topic , i))  // create csv records
  val allRecords = data.mkString("\n") // why do I/O if we can do in-mem?     
  val handleWriter = new PrintWriter(file, append=true)
  handleWriter.append(allRecords)
  handleWriter.close()

}

Несколько замечаний о коде в вопросе:

"spark.streaming.concurrentJobs", "4"

Это создаст проблему с записью нескольких потоков в один и тот же локальный файл. Это, вероятно, также неправильно используется в этом контексте.

sc.checkpoint ("контрольная точка")

Кажется, нет необходимости в контрольных точках на этой работе.

Самое простое, что можно сделать, это создать экземпляр PrintWriter внутри foreachRDD, это означает, что оно не будет захвачено закрытием функции:

stream.foreachRDD(rdd => {
  val qualityWriter = new PrintWriter(new File("diskQuality.txt"))
  qualityWriter.append("dateTime , quality , status \n")  

  rdd.collect().foreach(i => {
    val record = i.value()
    val newCsvRecord = process(t, record)
    qualityWriter.append(newCsvRecord)
    })
  })
})
Другие вопросы по тегам