avro4s не может десериализовать AnyRef
У меня есть простой класс дела
case class KafkaContainer(key: String, payload: AnyRef)
тогда я хочу отправить это в тему kafka через производителя я делаю это
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))
и это работает хорошо
тогда я пытаюсь потреблять это
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)
и это хорошо работает с любым классом в полезной нагрузке.
Но! Если это AnyRef. Потребительский код не работает с
Ошибка:(38, 96) не удалось найти неявное значение для параметра улик типа com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
Ошибка:(38, 96) недостаточно аргументов для двоичного метода: (неявное доказательство $21: com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer], неявное доказательство $22: com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]. Неуказанное значение параметра свидетельствует о $22. val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
если я объявлю последствия с
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
это не скомпилировать с
Ошибка:(58, 71) не удалось найти неявное значение Lazy типа com.sksamuel.avro4s.FromValue[Object] неявное значение val из Record: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
Ошибка:(58, 71) недостаточно аргументов для метода lazyConverter: (неявно fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]. Не указано значение параметра fromValue. неявный val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
если добавить каждый неявный, что complier требуется
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
компиляция завершается с ошибкой
Ошибка:(58, 69) исключение при расширении макроса: java.lang.IllegalArgumentException: требование не выполнено: Требуется класс case, но объект находится не в scala.Predef$.require(Predef.scala:277) на com.sksamuel.avro4s.FromRecord$.applyImpl(FromRecord.scala:283) неявный val fromRecordObject: FromRecord[Object] = FromRecord[Object]
но если я заменю AnyRef для какого-то класса - неявный не требуется, все снова работает нормально
0 ответов
У меня похожая проблема с использованием любого типа данных. Вы должны указать, какие типы для этой переменной-члена допустимы, поскольку Any или AnyRef может быть чем угодно. Затем используйте Either или shapeless (также см. Документацию Github). В моем случае это может быть String, Long, Double или null, поэтому с помощью shapeless вы можете сделать:
case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])
Это преобразует в тип объединения в AVRO:
{
"name" : "value",
"type" : [ "null", "string", "long", "double" ]
}