Bijection - Сериализация Java Avro

Я ищу пример, чтобы сделать Bijection на Avro SpecificRecordBase объект похож на GenericRecordBase или если есть более простой способ использовать AvroSerializer Класс как ключ Kafka и сериализатор значений.

Injection<GenericRecord, byte[]> genericRecordInjection =
                                        GenericAvroCodecs.toBinary(schema);
byte[] bytes = genericRecordInjection.apply(type);

2 ответа

https://github.com/miguno/kafka-storm-starter предоставляет такой пример кода.

См. Например, AvroDecoderBolt. Из его ага:

Этот болт ожидает поступления данных в двоичном формате с кодировкой Avro, сериализованным в соответствии со схемой Avro T, Это будет десериализовать входящие данные в T pojo, и испускайте это pojo для нижестоящих потребителей. Как таковой этот болт можно считать штормовым эквивалентом Twitter Bijection's Injection.invert[T, Array[Byte]](bytes) для данных Avro.

где

T: Тип записи Avro (например, Tweet) на основе используемой базовой схемы Avro. Должно быть подклассом Авро SpecificRecordBase,

Ключевая часть кода (я свернул код в этот фрагмент):

// With T <: SpecificRecordBase

implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
  case Success(pojo) =>
    System.out.println("Binary data decoded into pojo: " + pojo)
  case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(new File("/Users/.../schema.avsc"));
            Injection<Command, byte[]> objectInjection = SpecificAvroCodecs.toBinary(schema);
            byte[] bytes = objectInjection.apply(c);
Другие вопросы по тегам