Декодирование перечислений Java / пользовательских нестандартных классов с использованием Structured Spark Streaming

Я пытаюсь использовать структурированную потоковую передачу в Spark 2.1.1 для чтения из Kafka и декодирования закодированных сообщений Avro. У меня есть UDF, определенный в соответствии с этим вопросом.

val sr = new CachedSchemaRegistryClient(conf.kafkaSchemaRegistryUrl, 100)
val deser = new KafkaAvroDeserializer(sr)

val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }

val topic = conf.inputTopic
val df = session
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", conf.kafkaServers)
    .option("subscribe", topic)
    .load()

df.printSchema()

val result = df.selectExpr("CAST(key AS STRING)", """decodeMessage($"value") as "value_des"""")

val query = result.writeStream
    .format("console")
    .outputMode(OutputMode.Append())
    .start()

Однако я получаю следующий сбой.

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type DeviceRelayStateEnum is not supported

Это терпит неудачу на этой линии

val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }

Альтернативный подход состоял в том, чтобы определить кодировщики для пользовательских классов, которые у меня есть

implicit val enumEncoder = Encoders.javaSerialization[DeviceRelayStateEnum]
implicit val messageEncoder = Encoders.product[DeviceRead]

но это терпит неудачу со следующей ошибкой, когда messageEncoder регистрируется.

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for DeviceRelayStateEnum
- option value class: "DeviceRelayStateEnum"
- field (class: "scala.Option", name: "deviceRelayState")
- root class: "DeviceRead"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:476)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

Когда я пытаюсь сделать это с помощью map после load() Я получаю следующую ошибку компиляции.

val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])

Error:(76, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[DeviceRead])org.apache.spark.sql.Dataset[DeviceRead].
Unspecified value parameter evidence$6.
      val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
      val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])

Значит ли это, что я не могу использовать структурированный поток для перечислений Java? И это может использоваться только с примитивами или классами случаев?

Я прочитал несколько связанных вопросов 1, 2, 3 по этому вопросу, и кажется, что возможность указать собственный кодировщик для класса, т.е. UDT, была удалена в 2.1, а новая функциональность не была добавлена.

Любая помощь будет оценена.

1 ответ

Я думаю, что вы, возможно, просите слишком много в текущей версии структурированной потоковой передачи (и Spark SQL) в целом.

Я пока не смог полностью понять, как решить проблему с отсутствующими кодировщиками так называемым более профессиональным способом, но я столкнулся с той же проблемой, когда пытался создать Dataset перечислений. Это может просто не поддерживаться.

Структурированная потоковая передача - это просто потоковая библиотека поверх Spark SQL, которая использует ее для сериализации-десериализации (SerDe).

Чтобы кратко изложить историю и приступить к работе (пока вы не примете лучший способ), я бы рекомендовал избегать использования перечислений в бизнес-объектах, которые вы используете для представления схемы ваших наборов данных.

Итак, я бы рекомендовал сделать что-то вроде:

val decodeMessage = udf { bytes:Array[Byte] =>
  val dr = deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead]

  // do additional transformation here so you use a custom streaming-specific class
  // Here I'm using a simple tuple to hold what might be relevant
  // You could create a case class instead to have proper names
  (dr.id, dr.value)
}
Другие вопросы по тегам