Декодирование перечислений Java / пользовательских нестандартных классов с использованием Structured Spark Streaming
Я пытаюсь использовать структурированную потоковую передачу в Spark 2.1.1 для чтения из Kafka и декодирования закодированных сообщений Avro. У меня есть UDF, определенный в соответствии с этим вопросом.
val sr = new CachedSchemaRegistryClient(conf.kafkaSchemaRegistryUrl, 100)
val deser = new KafkaAvroDeserializer(sr)
val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }
val topic = conf.inputTopic
val df = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.kafkaServers)
.option("subscribe", topic)
.load()
df.printSchema()
val result = df.selectExpr("CAST(key AS STRING)", """decodeMessage($"value") as "value_des"""")
val query = result.writeStream
.format("console")
.outputMode(OutputMode.Append())
.start()
Однако я получаю следующий сбой.
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type DeviceRelayStateEnum is not supported
Это терпит неудачу на этой линии
val decodeMessage = udf { bytes:Array[Byte] => deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead] }
Альтернативный подход состоял в том, чтобы определить кодировщики для пользовательских классов, которые у меня есть
implicit val enumEncoder = Encoders.javaSerialization[DeviceRelayStateEnum]
implicit val messageEncoder = Encoders.product[DeviceRead]
но это терпит неудачу со следующей ошибкой, когда messageEncoder регистрируется.
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for DeviceRelayStateEnum
- option value class: "DeviceRelayStateEnum"
- field (class: "scala.Option", name: "deviceRelayState")
- root class: "DeviceRead"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:476)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
Когда я пытаюсь сделать это с помощью map
после load()
Я получаю следующую ошибку компиляции.
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[DeviceRead])org.apache.spark.sql.Dataset[DeviceRead].
Unspecified value parameter evidence$6.
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Error:(76, 26) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val result = df.map((bytes: Row) => deser.deserialize("topic", bytes.getAs[Array[Byte]]("value")).asInstanceOf[DeviceRead])
Значит ли это, что я не могу использовать структурированный поток для перечислений Java? И это может использоваться только с примитивами или классами случаев?
Я прочитал несколько связанных вопросов 1, 2, 3 по этому вопросу, и кажется, что возможность указать собственный кодировщик для класса, т.е. UDT, была удалена в 2.1, а новая функциональность не была добавлена.
Любая помощь будет оценена.
1 ответ
Я думаю, что вы, возможно, просите слишком много в текущей версии структурированной потоковой передачи (и Spark SQL) в целом.
Я пока не смог полностью понять, как решить проблему с отсутствующими кодировщиками так называемым более профессиональным способом, но я столкнулся с той же проблемой, когда пытался создать Dataset
перечислений. Это может просто не поддерживаться.
Структурированная потоковая передача - это просто потоковая библиотека поверх Spark SQL, которая использует ее для сериализации-десериализации (SerDe).
Чтобы кратко изложить историю и приступить к работе (пока вы не примете лучший способ), я бы рекомендовал избегать использования перечислений в бизнес-объектах, которые вы используете для представления схемы ваших наборов данных.
Итак, я бы рекомендовал сделать что-то вроде:
val decodeMessage = udf { bytes:Array[Byte] =>
val dr = deser.deserialize("topic.name", bytes).asInstanceOf[DeviceRead]
// do additional transformation here so you use a custom streaming-specific class
// Here I'm using a simple tuple to hold what might be relevant
// You could create a case class instead to have proper names
(dr.id, dr.value)
}