Преобразование StructType в Avro Schema возвращает тип как Union при использовании блоков данных spark-avro
Я использую блоки данных spark-avro для преобразования схемы данных в схему avro. Возвращенная схема avro не может иметь значение по умолчанию. Это вызывает проблемы, когда я пытаюсь создать общую запись из схемы. Может ли кто-нибудь помочь с правильным способом использования этой функции?
Dataset<Row> sellableDs = sparkSession.sql("sql query");
SchemaBuilder.RecordBuilder<Schema> rb = SchemaBuilder.record("testrecord").namespace("test_namespace");
Schema sc = SchemaConverters.convertStructToAvro(sellableDs.schema(), rb, "test_namespace");
System.out.println(sc.toString());
System.out.println(sc.getFields().get(0).toString());
String schemaString = sc.toString();
sellableDs.foreach(
(ForeachFunction<Row>) row -> {
Schema scEx = new Schema.Parser().parse(schemaString);
GenericRecord gr;
gr = new GenericData.Record(scEx);
System.out.println("Generic record Created");
int fieldSize = scEx.getFields().size();
for (int i = 0; i < fieldSize; i++ ) {
// System.out.println( row.get(i).toString());
System.out.println("field: " + scEx.getFields().get(i).toString() + "::" + "value:" + row.get(i));
gr.put(scEx.getFields().get(i).toString(), row.get(i));
//i++;
}
}
);
Это схема df:
StructType(StructField(key,IntegerType,true), StructField(value,DoubleType,true))
Это авро преобразованная схема:
{"type":"record","name":"testrecord","namespace":"test_namespace","fields":[{"name":"key","type":["int","null"]},{"name":"value","type":["double","null"]}]}
1 ответ
Проблема заключается в том, что класс SchemaConverters не включает значения по умолчанию как часть создания схемы. У вас есть 2 варианта: изменить схему, добавив значения по умолчанию перед созданием записи, или заполнить запись перед созданием какого-либо значения (это могут быть значения из вашей строки). Например, ноль. Это пример того, как создать запись, используя вашу схему
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.Schema
var schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"testrecord\",\"namespace\":\"test_namespace\",\"fields\":[{\"name\":\"key\",\"type\":[\"int\",\"null\"]},{\"name\":\"value\",\"type\":[\"double\",\"null\"]}]}")
var builder = new GenericRecordBuilder(schema);
for (i <- 0 to schema.getFields().size() - 1 ) {
builder.set(schema.getFields().get(i).name(), null)
}
var record = builder.build();
print(record.toString())