Сериализация общих записей avro в виде массива [Byte] сохраняет схему в объекте

ситуация

В настоящее время я пишу для потребителя / производителя, используя AVRO и репозиторий схемы.

Исходя из того, что я собираю, мои варианты для сериализации этих данных либо использовать Avro-сериализатор Confluent, либо перейти на Twitter Bijection.

Казалось, Биекция выглядела наиболее просто.

Поэтому я хочу произвести дату в следующем формате ProducerRecord[String,Array[Byte]], это сводится к [некоторый идентификатор строки, сериализованный GenericRecord]

(примечание: я собираюсь сделать общие записи, так как эта кодовая база должна обрабатывать тысячи схем, которые анализируются из Json/csv/...)

Вопрос:

Единственная причина, по которой я сериализую и использую AVRO, заключается в том, что вам не нужно иметь схему в самих данных (как это было бы с Json/XML/...).
Однако при проверке данных в теме я вижу, что вся схема содержится вместе с данными. Я делаю что-то в корне неправильно, это из-за дизайна, или я должен вместо этого использовать сливной сериализатор?

Код:

  def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
    val converter = new JsonAvroConverter
    val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)

    genericRecord
  }
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
    //val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
    val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)

    r
  }

//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))


        producer.send(producerRecord, new Callback {...})

1 ответ

Решение

Если вы посмотрите на исходный код Confluent, то увидите, что порядок взаимодействия с хранилищем схемы

  1. Возьмите схему из записи Avro и вычислите ее ID. В идеале, отправка схемы в хранилище или иное хеширование должно дать вам идентификатор.
  2. Выделить ByteBuffer
  3. Записать возвращенный идентификатор в буфер
  4. Запишите значение объекта Avro (исключая схему) в виде байтов в буфер
  5. Отправить этот байтовый буфер Кафке

В настоящее время ваше использование Bijection будет включать схему в байтах, а не заменять ее идентификатором

Другие вопросы по тегам