Сериализация общих записей avro в виде массива [Byte] сохраняет схему в объекте
ситуация
В настоящее время я пишу для потребителя / производителя, используя AVRO и репозиторий схемы.
Исходя из того, что я собираю, мои варианты для сериализации этих данных либо использовать Avro-сериализатор Confluent, либо перейти на Twitter Bijection.
Казалось, Биекция выглядела наиболее просто.
Поэтому я хочу произвести дату в следующем формате ProducerRecord[String,Array[Byte]]
, это сводится к [некоторый идентификатор строки, сериализованный GenericRecord]
(примечание: я собираюсь сделать общие записи, так как эта кодовая база должна обрабатывать тысячи схем, которые анализируются из Json/csv/...)
Вопрос:
Единственная причина, по которой я сериализую и использую AVRO, заключается в том, что вам не нужно иметь схему в самих данных (как это было бы с Json/XML/...).
Однако при проверке данных в теме я вижу, что вся схема содержится вместе с данными. Я делаю что-то в корне неправильно, это из-за дизайна, или я должен вместо этого использовать сливной сериализатор?
Код:
def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
val converter = new JsonAvroConverter
val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)
genericRecord
}
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
//val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)
r
}
//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))
producer.send(producerRecord, new Callback {...})
1 ответ
Если вы посмотрите на исходный код Confluent, то увидите, что порядок взаимодействия с хранилищем схемы
- Возьмите схему из записи Avro и вычислите ее ID. В идеале, отправка схемы в хранилище или иное хеширование должно дать вам идентификатор.
- Выделить ByteBuffer
- Записать возвращенный идентификатор в буфер
- Запишите значение объекта Avro (исключая схему) в виде байтов в буфер
- Отправить этот байтовый буфер Кафке
В настоящее время ваше использование Bijection будет включать схему в байтах, а не заменять ее идентификатором