Развитие схемы с помощью Spark DataFrame
Я работаю с фреймом данных Spark, который может загружать данные из одной из трех версий схемы:
// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }
Я могу обработать дополнительную букву "C", проверив, содержит ли схема поле "C", и если нет, добавив новый столбец в фрейм данных. Однако я не могу понять, как создать поле для подобъекта.
public void evolvingSchema() {
String versionOne = "{ \"A\": {\"B\": 1 } }";
String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";
process(spark.getContext(), "1", versionOne);
process(spark.getContext(), "2", versionTwo);
process(spark.getContext(), "2", versionThree);
}
private static void process(JavaSparkContext sc, String version, String data) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
}
// Not sure what to put here. The fieldNames does not contain the "A.D"
try {
df.select("C").collect();
} catch(Exception e) {
System.out.println("Failed to C for " + version);
}
try {
df.select("A.D").collect();
} catch(Exception e) {
System.out.println("Failed to A.D for " + version);
}
}
2 ответа
Источники JSON не очень хорошо подходят для данных с развивающейся схемой (как, например, с Avro или Parquet), но простое решение состоит в том, чтобы использовать одну и ту же схему для всех источников и сделать новые поля необязательными / обнуляемыми:
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val schema = StructType(Seq(
StructField("A", StructType(Seq(
StructField("B", LongType, true),
StructField("D", LongType, true)
)), true),
StructField("C", LongType, true)))
Вы можете пройти schema
как это DataFrameReader
:
val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)
val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)
val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)
и вы получите согласованную структуру, независимую от варианта:
require(df1.schema == df2.schema && df2.schema == df3.schema)
с отсутствующими столбцами, автоматически установленными на null
:
df1.printSchema
// root
// |-- A: struct (nullable = true)
// | |-- B: long (nullable = true)
// | |-- D: long (nullable = true)
// |-- C: long (nullable = true)
df1.show
// +--------+----+
// | A| C|
// +--------+----+
// |[1,null]|null|
// +--------+----+
df2.show
// +--------+---+
// | A| C|
// +--------+---+
// |[1,null]| 2|
// +--------+---+
df3.show
// +-----+---+
// | A| C|
// +-----+---+
// |[1,3]| 2|
// +-----+---+
Примечание:
Это решение зависит от источника данных. Это может или не может работать с другими источниками, или даже привести к неправильной записи.
zero323 уже ответил на вопрос, но в Scala. Это то же самое, но в Java.
public void evolvingSchema() {
String versionOne = "{ \"A\": {\"B\": 1 } }";
String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";
process(spark.getContext(), "1", versionOne);
process(spark.getContext(), "2", versionTwo);
process(spark.getContext(), "2", versionThree);
}
private static void process(JavaSparkContext sc, String version, String data) {
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("A",
DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("B", DataTypes.LongType, true),
DataTypes.createStructField("D", DataTypes.LongType, true))), true),
DataTypes.createStructField("C", DataTypes.LongType, true)));
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));
try {
df.select("C").collect();
} catch(Exception e) {
System.out.println("Failed to C for " + version);
}
try {
df.select("A.D").collect();
} catch(Exception e) {
System.out.println("Failed to A.D for " + version);
}
}