Nodejs avro-сериализация без реестра схем с последующей десериализацией в Kafka Streams
Я хотел бы попросить совета по следующей проблеме. Я пытаюсь узнать, как выполнить сериализацию данных Avro с помощью nodejs без реестра схем, опубликовать его в кластере Kafka, а затем получить его в Kafka Streams (Java).
На стороне javascript я попытался использовать kafka-node вместе с avsc для сериализации. В Kafka Streams я решил реализовать собственный Serde, поскольку, насколько я понимаю, Avro Serdes, предоставляемые Streams API, предназначены для получения схем непосредственно из реестра схем.
Вот фрагмент кода javascript для простого производителя:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
И вот как я сейчас пытаюсь реализовать десериализатор:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
Однако, когда приложение потоков пытается десериализовать данные, я сталкиваюсь со следующей ошибкой, особенно в строке кода, где создается экземпляр DataFileReader:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Я не понимаю, как действовать дальше. Любой совет будет принят во внимание.
1 ответ
Возможно, я ошибаюсь, но я думаю, что вам не следует использовать DataFileReader, только DatumReader.
Я сделал что-то подобное в kafka (не в Kafka Streams), возможно, могу дать вам несколько идей:
Полный пример (очень простой) находится здесь:https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java
Как видите, я не создавал сериализатор, я десериализовал значение и получил Generic Record.
public static void main(String[] args) {
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, byte[]> consumerRecords;
String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
+ "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
+ "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
+ "]}}";
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
try {
while (true) {
consumerRecords = consumer.poll(1000);
consumerRecords.forEach(record -> {
ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
GenericRecord deserializedValue = null;
try {
deserializedValue = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
});
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
System.out.println("DONE");
}
}
Я надеюсь, что это помогает.