"Задача не сериализуема" пытается разобрать JSON

В то время как

import play.api.libs.json._

case class Person(name: String, lovesPandas: Boolean)
implicit val personFormat = Json.format[Person]

val text = """{"name":"Sparky The Bear", "lovesPandas":true}"""

val jsonParse = Json.parse(text)
val result = Json.fromJson[Person](jsonParse)
result.get

работает на ноутбуке Jupyter с ядром Apache Toree,

import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Person(name: String, lovesPandas: Boolean)
implicit val personReads = Json.format[Person]

val text = """{"name":"Sparky The Bear", "lovesPandas":true}"""

val input = sc.parallelize(List(text))
val parsed = input.map(Json.parse(_))
val result = parsed.flatMap(record => {    
    personReads.reads(record).asOpt
})
result.filter(_.lovesPandas).map(Json.toJson(_)).saveAsTextFile("files/out/pandainfo.json")

возвращается

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[...]

хотя указанный пример получен из https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/BasicParseJson.scala

Я понимаю, что объекты, передаваемые на другие узлы, должны быть сериализованы, и это, по-видимому, невозможно. Так что-то не так с примером или я что-то не так делаю? Как это исправить?


Кстати

import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._

val text = """{"name":"Sparky The Bear", "lovesPandas":true}"""

case class Person(name: String, lovesPandas: Boolean)

val input = sc.parallelize(List(text))
val parsed = input.map(Json.parse(_))
val result = parsed.flatMap(record => {
    implicit val personReads = Json.format[Person]
    personReads.reads(record).asOpt
})
result.collect

приведет к

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 3.0 in stage 0.0 (TID 3) had a not serializable result: play.api.libs.json.OFormat$$anon$1
Serialization stack:
    - object not serializable (class: play.api.libs.json.OFormat$$anon$1, value: 
[...]

я использовал result.collect проверить правильность этой части кода.

Кроме того, если я напишу

result. filter(_.lovesPandas).map{Json.toJson(_)}.saveAsTextFile("files/out/pandainfo.json")

вместо result.collect я получил

Name: Compile Error
Message: <console>:166: error: No Json serializer found for type Person. Try to implement an implicit Writes or Format for this type.
                      Json.toJson(_)
                                 ^
StackTrace: 

так что я думаю, должен объявить Person быть Serializable, Тем не менее, добавив extends Serializable на это в конце концов не имеет никакого эффекта, в то время как with Serializable выдает ошибку

Name: Compile Error
Message: <console>:2: error: ';' expected but 'with' found.
       case class Person(name: String, lovesPandas: Boolean) with Serializable
                                                             ^

1 ответ

Я пойду догадкой и скажу значение, возвращаемое Json.format не сериализуем.

Чтобы обойти это, вы можете объявить значение внутри flatMap:

val result = parsed.flatMap(record => {    
  val personReads = Json.format[Person]
  val jsValue = Json.parse(record)
  personReads.reads(jsValue).asOpt
})

редактировать

Я думаю, что причиной проблем является тот факт, что Json.parse возвращается JsValue который не сериализуем.

Вы можете сузить это до одного map:

sc
  .parallelize(List(text))
  .map(record => {
    val personReads = Json.format[Person]
    val jsValue = Json.parse(record)
    personReads.reads(jsValue).asOpt
  })
 .filter(_.lovesPandas)
 .map(Json.toJson(_).toString)
 .saveAsTextFile("files/out/pandainfo.json")
Другие вопросы по тегам