Как конвертировать вложенный avro GenericRecord в строку
У меня есть код для преобразования моей записи Avro в строку с помощью функции avroToRowConverter()
directKafkaStream.foreachRDD(rdd -> {
JavaRDD<Row> newRDD= rdd.map(x->{
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
return avroToRowConverter(recordInjection.invert(x._2).get());
});
Эта функция не работает для вложенной схемы (TYPE= UNION)
,
private static Row avroToRowConverter(GenericRecord avroRecord) {
if (null == avroRecord) {
return null;
}
//GenericData
Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
for (Schema.Field field : avroRecord.getSchema().getFields()) {
if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
objectArray[field.pos()] = ""+avroRecord.get(field.pos());
}else {
objectArray[field.pos()] = avroRecord.get(field.pos());
}
}
return new GenericRowWithSchema(objectArray, structType);
}
Может кто-нибудь предложить, как я могу преобразовать сложную схему в ROW?
1 ответ
Решение
Есть SchemaConverters.createConverterToSQL
но это к сожалению личное. Есть PR, чтобы сделать это публичным, но они никогда не были объединены:
Хотя есть обходной путь, который мы использовали.
Вы можете выставить это, создав класс в com.databricks.spark.avro
пакет:
package com.databricks.spark.avro
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType
object MySchemaConversions {
def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}
Затем вы можете использовать его в своем коде так:
final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();
final Function1<GenericRecord, Row> myAvroRecordConverter =
MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);
Row[] convertAvroRecordsToRows(List<GenericRecord> records) {
return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);
}
Для одной записи вы можете просто назвать это так:
final Row row = myAvroRecordConverter.apply(record);