О том, как программным способом создать объект схемы org.apache.spark.sql.types.StructType, начиная с файла json.

Мне нужно создать собственный объект схемы org.apache.spark.sql.types.StructType с информацией из файла json, файл json может быть любым, поэтому я параметризовал его в файле свойств.

Вот так выглядит файл свойств:

//ruta al esquema del fichero output (por defecto se infiere el esquema del Parquet destino). Si existe, el esquema será en formato JSON, aplicable a DataFrame (ver StructType.fromJson)
schema.parquet=/Users/XXXX/Desktop/generated_schema.json
writing.mode=overwrite
separator=;
header=false

Файл generate_schema.json выглядит так:

{"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}

Итак, вот как я думал, что я могу решить это:

val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))

System.out.println("schema_json looks like "  + schema_json.head)

val mySchemaStructType :DataType = DataType.fromJson(schema_json.head)

/*
After this line, mySchemaStructType have four StructFields objects inside it, the same than appears at schema_json
*/
logger.info(mySchemaStructType)

val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)

/*

After this line, myStructType have zero StructFields! here must be the bug, myStructType should have the four StructFields that represents the loaded schema json! this must be the error! but how can i construct the necessary StructType object?

*/

myDF = loadCSV(sqlContext, path_input_csv,separator,myStructType,header)
System.out.println("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()

df.write
  .format("com.databricks.spark.csv")
  .option("header", header)
  .option("delimiter",delimiter)
  .option("nullValue","")
  .option("treatEmptyValuesAsNulls","true")
  .mode(saveMode)
  .parquet(pathParquet)

Когда код запускает последнюю строку, .parquet(pathParquet), возникает исключение:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

Вывод этого кода выглядит следующим образом:

16/11/11 13:57:04 INFO AnotherCSVtoParquet$: The job started using this propertie file: /Users/aisidoro/Desktop/mra-csv-converter/parametrizacion.properties
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_input_csv is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_output_parquet  is /Users/aisidoro/Desktop/output900000
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: mra_schema_parquet is /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: writting_mode is overwrite
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: separator is ;
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: header is false
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: ATTENTION! aplying mra_schema_parquet  /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
schema_json looks like {"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
16/11/11 13:57:12 INFO AnotherCSVtoParquet$: StructType(StructField(codigo,StringType,true), StructField(otro,StringType,true), StructField(vacio,StringType,true), StructField(final,StringType,true))
 16/11/11 13:57:13 INFO AnotherCSVtoParquet$: loadCSV. header is false, inferSchema is false pathCSV is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv separator is ;
 myDF.schema.json looks like {"type":"struct","fields":[]}

Должно быть, что объект schema_json и объект myDF.schema.json должны иметь одинаковое содержимое, не так ли? но этого не произошло. Я думаю, что это должно запустить ошибку.

В конце концов, задание рушится с этим исключением:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

Дело в том, что если я не предоставлю какой-либо файл схемы JSON, работа будет выполняться нормально, но с этой схемой...

Кто-нибудь может мне помочь? Я просто хочу создать несколько паркетных файлов, начиная с файла CSV и файла схемы JSON.

Спасибо.

Зависимости:

    <spark.version>1.5.0-cdh5.5.2</spark.version>
    <databricks.version>1.5.0</databricks.version>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>${databricks.version}</version>
    </dependency>

ОБНОВИТЬ

Я вижу, что есть открытый вопрос,

https://github.com/databricks/spark-csv/issues/61

2 ответа

Решение

Наконец я нашел проблему.

Проблема была в этих строках:

val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)

я должен использовать эту строку:

val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]

Я должен привести StructType из DataType, чтобы все заработало.

Так как вы сказали Custom SchemaВы можете сделать что-то вроде этого.

val schema = (new StructType).add("field1", StringType).add("field2", StringType)
sqlContext.read.schema(schema).json("/json/file/path").show

Кроме того, посмотрите на это и это

Вы можете создать вложенную схему JSON, как показано ниже.

Например:

{
  "field1": {
    "field2": {
      "field3": "create",
      "field4": 1452121277
    }
  }
}

val schema = (new StructType)
  .add("field1", (new StructType)
    .add("field2", (new StructType)
      .add("field3", StringType)
      .add("field4", LongType)
    )
  )
Другие вопросы по тегам