Spark Streaming - Как получить результаты из функции foreachRDD?

Я пытаюсь читать сообщения Кафки с помощью Spark Streaming, делать некоторые вычисления и отправлять результаты другому процессу.

val jsonObject = new JSONObject

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

stream.foreachRDD { rdd => {
  val jsonDF = spark.read.json(rdd.map(_._2))
  val res = jsonDF.groupBy("artist").count.sort(col("count").desc).take(10)
  /*Some Transformations => create jsonArray*/
  jsonObject.put("Key", jsonArray)
}}

ssc.start()

Мне нужно накопить JSONObject (глобальная переменная) для моего требования. put операция выдает исключение NotSerializable.

java.io.NotSerializableException: Объект org.apache.spark.streaming.kafka.DirectKafkaInputDStream$MappedDStream сериализуется, возможно, как часть закрытия операции RDD. Это связано с тем, что на объект DStream ссылаются из замыкания. Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого. Это было сделано, чтобы избежать раздувания задач Spark с ненужными объектами.

Можно ли отправить этот jsonArray из этого блока foreahRDD? Я не хочу писать в файлы или базы данных.

0 ответов

Другие вопросы по тегам