Отбрасывайте строки в искре, которые не соответствуют схеме

В настоящее время схема моей таблицы:

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)

Я хочу применить приведенную ниже схему к приведенной выше таблице и удалить все строки, которые не соответствуют приведенной ниже схеме:

val productsSchema = StructType(Seq(
    StructField("product_id",IntegerType,nullable = true),
    StructField("product_name",StringType,nullable = true),
    StructField("aisle_id",IntegerType,nullable = true),
    StructField("department_id",IntegerType,nullable = true)
  ))

3 ответа

Решение

Используйте опцию "DROPMALFORMED" при загрузке данных, которая игнорирует поврежденные записи.

spark.read.format("json")
  .option("mode", "DROPMALFORMED")
  .option("header", "true")
  .schema(productsSchema)
  .load("sample.json")

Если данные не соответствуют схеме, искра поместит nullкак значение в этом столбце. Нам просто нужно отфильтровать нулевые значения для всех столбцов.

Используемый filter для фильтрации `` нулевых '' значений для всех столбцов.

scala> "cat /tmp/sample.json".! // JSON File Data, one row is not matching with schema.
{"product_id":1,"product_name":"sampleA","aisle_id":"AA","department_id":"AAD"}
{"product_id":2,"product_name":"sampleBB","aisle_id":"AAB","department_id":"AADB"}
{"product_id":3,"product_name":"sampleCC","aisle_id":"CC","department_id":"CCC"}
{"product_id":3,"product_name":"sampledd","aisle_id":"dd","departmentId":"ddd"}
{"name","srinivas","age":29}
res100: Int = 0

scala> schema.printTreeString
root
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)


scala> val df = spark.read.schema(schema).option("badRecordsPath", "/tmp/badRecordsPath").format("json").load("/tmp/sample.json") // Loading Json data & if schema is not matching we will be getting null rows for all columns.
df: org.apache.spark.sql.DataFrame = [aisle_id: string, department_id: string ... 2 more fields]

scala> df.show(false)
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA      |AAD          |1         |sampleA     |
|AAB     |AADB         |2         |sampleBB    |
|CC      |CCC          |3         |sampleCC    |
|dd      |null         |3         |sampledd    |
|null    |null         |null      |null        |
+--------+-------------+----------+------------+


scala> df.filter(df.columns.map(c => s"${c} is not null").mkString(" or ")).show(false) // Filter null rows.
+--------+-------------+----------+------------+
|aisle_id|department_id|product_id|product_name|
+--------+-------------+----------+------------+
|AA      |AAD          |1         |sampleA     |
|AAB     |AADB         |2         |sampleBB    |
|CC      |CCC          |3         |sampleCC    |
|dd      |null         |3         |sampledd    |
+--------+-------------+----------+------------+


scala>

Проверить na.drop функции на data-frame, вы можете отбрасывать строки на основе значений NULL, минимальных значений NULL в строке, а также на основе определенного столбца, который имеет значения NULL.

scala> sc.parallelize(Seq((1,"a","a"),(1,"a","a"),(2,"b","b"),(3,"c","c"),(4,"d","d"),(4,"d",null))).toDF
res7: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 1 more field]

scala> res7.show()
+---+---+----+
| _1| _2|  _3|
+---+---+----+
|  1|  a|   a|
|  1|  a|   a|
|  2|  b|   b|
|  3|  c|   c|
|  4|  d|   d|
|  4|  d|null|
+---+---+----+

//dropping row if a null is found
scala> res7.na.drop.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  a|  a|
|  1|  a|  a|
|  2|  b|  b|
|  3|  c|  c|
|  4|  d|  d|
+---+---+---+

//drops only if `minNonNulls = 3` if accepted to each row
scala> res7.na.drop(minNonNulls = 3).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  a|  a|
|  1|  a|  a|
|  2|  b|  b|
|  3|  c|  c|
|  4|  d|  d|
+---+---+---+

//not dropping any
scala> res7.na.drop(minNonNulls = 2).show()
+---+---+----+
| _1| _2|  _3|
+---+---+----+
|  1|  a|   a|
|  1|  a|   a|
|  2|  b|   b|
|  3|  c|   c|
|  4|  d|   d|
|  4|  d|null|
+---+---+----+

//drops row based on nulls in `_3` column
scala> res7.na.drop(Seq("_3")).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  a|  a|
|  1|  a|  a|
|  2|  b|  b|
|  3|  c|  c|
|  4|  d|  d|
+---+---+---+

Другие вопросы по тегам