Bijection - Сериализация Java Avro
Я ищу пример, чтобы сделать Bijection на Avro SpecificRecordBase
объект похож на GenericRecordBase
или если есть более простой способ использовать AvroSerializer
Класс как ключ Kafka и сериализатор значений.
Injection<GenericRecord, byte[]> genericRecordInjection =
GenericAvroCodecs.toBinary(schema);
byte[] bytes = genericRecordInjection.apply(type);
2 ответа
https://github.com/miguno/kafka-storm-starter предоставляет такой пример кода.
См. Например, AvroDecoderBolt. Из его ага:
Этот болт ожидает поступления данных в двоичном формате с кодировкой Avro, сериализованным в соответствии со схемой Avro
T
, Это будет десериализовать входящие данные вT
pojo, и испускайте это pojo для нижестоящих потребителей. Как таковой этот болт можно считать штормовым эквивалентом Twitter Bijection'sInjection.invert[T, Array[Byte]](bytes)
для данных Avro.
где
T
: Тип записи Avro (например,Tweet
) на основе используемой базовой схемы Avro. Должно быть подклассом АвроSpecificRecordBase
,
Ключевая часть кода (я свернул код в этот фрагмент):
// With T <: SpecificRecordBase
implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
case Success(pojo) =>
System.out.println("Binary data decoded into pojo: " + pojo)
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("/Users/.../schema.avsc"));
Injection<Command, byte[]> objectInjection = SpecificAvroCodecs.toBinary(schema);
byte[] bytes = objectInjection.apply(c);