pyspark json прочитал, чтобы пометить плохие записи
Я хочу использовать pyspark для разбора файлов с данными json и хочу пометить "плохие / неожиданные" записи. Под "плохими / неожиданными записями" я подразумеваю те, которые не следуют указанной мной схеме. У меня есть этот входной файл и хочу указать схему. Это работает, когда данные в ожидаемом формате согласно схеме. (inp1.json) Не работает, если во входном файле данные имеют неправильный формат. (inp2.json) В этом случае, он просто читает весь файл / dataframe как ноль. Что я хочу, так это просто рассматривать эту запись как поврежденную и читать оставшиеся 3 строки. Любое предложение, пожалуйста.
inp1.json (data in correct format)
[{"last_name": ["ln1", ""], "city": ["c1", "c2"]},
{"last_name": ["ln3", "ln4"], "city": ["c10", "c20"]},
{"last_name": ["ln2"], "city": ["c1", "c2"]}]
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType, DoubleType
myschema = StructType([
StructField('city', ArrayType(StringType(), True), True),
StructField('last_name', ArrayType(StringType(), True), True)
])
sc = SparkContext(appName=app)
inp_file="inp1.json"
spark = SparkSession.builder.appName("read_json").config("spark.some.config.option","some-value").enableHiveSupport().getOrCreate()
raw_df = spark.read.json(inp_file,multiLine=True, schema=myschema)
print "raw_df"
raw_df.show(truncate=False)
raw_df
+----------+----------+
|city |last_name |
+----------+----------+
|[c1, c2] |[ln1, ] |
|[c10, c20]|[ln3, ln4]|
|[c1, c2] |[ln2] |
+----------+----------+
Пробный прогон для данных с плохой записью
inp2.json (data in in correct format, please note that last_name in the last record is not an array, but just a string)
[{"last_name": ["ln1", ""], "city": ["c1", "c2"]},
{"last_name": ["ln3", "ln4"], "city": ["c10", "c20"]},
{"last_name": ["ln2"], "city": ["c1", "c2"]},{"last_name": "ln4", "city": ["c4", "c5"]}]
raw_df
+----+---------+
|city|last_name|
+----+---------+
|null|null |
+----+---------+
1 ответ
Вы можете указать mode=DROPMALFORMED
вариант при чтении json.
raw_df = spark.read.option('mode','DROPMALFORMED').json(inp_file,multiLine=True, schema=myschema)
https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html