avro4s не может десериализовать AnyRef

У меня есть простой класс дела

case class KafkaContainer(key: String, payload: AnyRef)

тогда я хочу отправить это в тему kafka через производителя я делаю это

val byteArrayStream = new ByteArrayOutputStream()
      val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
      output.write(msg)
      output.close()
      val bytes = byteArrayStream.toByteArray
      producer.send(new ProducerRecord("my_topic", msg.key, bytes))

и это работает хорошо

тогда я пытаюсь потреблять это

Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
    .map { msg =>
      val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
      val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
      val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
      input.close()
        ...
    }.runWith(Sink.ignore)

и это хорошо работает с любым классом в полезной нагрузке.

Но! Если это AnyRef. Потребительский код не работает с

Ошибка:(38, 96) не удалось найти неявное значение для параметра улик типа com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

Ошибка:(38, 96) недостаточно аргументов для двоичного метода: (неявное доказательство $21: com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer], неявное доказательство $22: com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]. Неуказанное значение параметра свидетельствует о $22. val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

если я объявлю последствия с

implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

это не скомпилировать с

Ошибка:(58, 71) не удалось найти неявное значение Lazy типа com.sksamuel.avro4s.FromValue[Object] неявное значение val из Record: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

Ошибка:(58, 71) недостаточно аргументов для метода lazyConverter: (неявно fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]. Не указано значение параметра fromValue. неявный val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

если добавить каждый неявный, что complier требуется

lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

компиляция завершается с ошибкой

Ошибка:(58, 69) исключение при расширении макроса: java.lang.IllegalArgumentException: требование не выполнено: Требуется класс case, но объект находится не в scala.Predef$.require(Predef.scala:277) на com.sksamuel.avro4s.FromRecord$.applyImpl(FromRecord.scala:283) неявный val fromRecordObject: FromRecord[Object] = FromRecord[Object]

но если я заменю AnyRef для какого-то класса - неявный не требуется, все снова работает нормально

0 ответов

У меня похожая проблема с использованием любого типа данных. Вы должны указать, какие типы для этой переменной-члена допустимы, поскольку Any или AnyRef может быть чем угодно. Затем используйте Either или shapeless (также см. Документацию Github). В моем случае это может быть String, Long, Double или null, поэтому с помощью shapeless вы можете сделать:

case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])

Это преобразует в тип объединения в AVRO:

{
    "name" : "value",
    "type" : [ "null", "string", "long", "double" ]
}
Другие вопросы по тегам