Pyspark: разобрать столбец строк json

У меня есть pyspark dataframe, состоящий из одного столбца, называемого jsonгде каждая строка является строкой Юникода в формате json. Я хотел бы проанализировать каждую строку и вернуть новый фрейм данных, где каждая строка - это проанализированный json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

Я попытался сопоставить каждый ряд с json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

Но это возвращает TypeError: expected string or buffer

Я подозреваю, что часть проблемы заключается в том, что при конвертации из dataframe для rddинформация о схеме теряется, поэтому я также попытался вручную ввести информацию о схеме:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

Но я получаю то же самое TypeError,

Глядя на этот ответ, выглядит как выравнивание строк flatMap может быть полезно здесь, но у меня тоже нет успеха:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

Я получаю эту ошибку: AttributeError: 'unicode' object has no attribute 'get',

7 ответов

Решение

Преобразование кадра данных со строками json в структурированный кадр данных на самом деле довольно прост, если вы преобразуете кадр данных в RDD строк раньше (см. http://spark.apache.org/docs/latest/sql-programming-guide.html)

Например:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
 |-- body: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- sub_json: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- sub_sub_json: struct (nullable = true)
 |    |    |    |-- col1: long (nullable = true)
 |    |    |    |-- col2: string (nullable = true)
 |-- header: struct (nullable = true)
 |    |-- foo: string (nullable = true)
 |    |-- id: long (nullable = true)

Для Spark 2.1+ вы можете использовать from_json что позволяет сохранить другие столбцы не-json в кадре данных следующим образом:

from pyspark.sql.functions import from_json
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

Вы позволяете Spark получить схему строкового столбца json. Тогда df.json Столбец больше не является StringType, а является правильно декодированной структурой json, т.е. StrucType и все остальные столбцы df сохраняются как есть.

Вы можете получить доступ к содержимому json следующим образом:

df.select(col('json.header').alias('header'))

Существующие ответы не сработают, если у вас JSON отличается от идеального / традиционного форматирования. Например, вывод схемы на основе rdd ожидает JSON в фигурных скобках {} и предоставит неверную схему (в результате null значения), если, например, ваши данные выглядят так:

[
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

Я написал функцию, чтобы обойти эту проблему, очистив JSON так, чтобы он жил в другом объекте JSON.

def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackru.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:

        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

Замечания: psf знак равно pyspark.sql.functions,

Вот краткая (искровой SQL) версия @nolan-conaway's parseJSONCols функция.

SELECT 
explode(
    from_json(
        concat('{"data":', 
               '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', 
               '}'), 
        'data array<struct<a:DOUBLE, b:INT>>'
    ).data) as data;

PS. Я также добавил функцию разнесения:P

Вам нужно знать некоторые типы HIVE SQL

Если вы не знаете схему каждого JSON (а она может быть другой), вы можете использовать:

      from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
 
# ... here you get your DF

# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))

От: https://github.com/apache/spark/pull/22775

      from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

def map2json(dict):
    import json
    return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())

spark.sql("select map2json(map('a', '1'))").show()

Этот ответ предназначен для дополнительного контекста, если ваши строки JSON представляют собой массивы JSON, а не объекты (я не могу комментировать, так как у меня нет представителя). Если вы используете надежный ответ Мартина Таппа, он вернет нулевые значения для ваших столбцов.

тл;др

Если ваши строки JSON являются объектами массива, например:

      [{"a":1, "b":1.0}]

вернет кадр данных, который содержит схему элементов в этих массивах, а не сам массив. не доволен этим, поэтому, чтобы быть настолько конкретным, насколько он хочет, вы можете обернуть схему, выведенную вArrayTypeи он будет правильно анализировать (вместо того, чтобы возвращать нулевые значения для всего).

      from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType

array_item_schema = \
  spark.read.json(df.rdd.map(lambda row: row['json_string_column'])).schema

json_array_schema = ArrayType(array_item_schema, True)

arrays_df = df.select(F.from_json('json_string_column', json_array_schema).alias('json_arrays'))

objects_df = arrays_df.select(F.explode('json_arrays').alias('objects'))

вступление

В качестве дополнения к Нолану Конавэю кажется, что когда ваш JSON имеет форму

      [
  {
    "a": 1.0,
    "b": 1
  },
  {
    "a": 0.0,
    "b": 2
  }
]

где объект верхнего уровня представляет собой массив (а не объект), pyspark рассматривает массив как набор объектов, которые необходимо преобразовать в строки, а не в одну строку.

См. пример запуска в оболочке PySpark 3.3.0:

      >>> myjson        = """[{"a": 1.0,"b": 1},{"a": 2.0,"b": 2}]"""
>>> myotherjson   = """[{"a": 3.0,"b": 3}]"""
>>> rawobjectjson = """{"a": 4.0,"b": 4}"""
>>> spark_read_df = spark.read.json(sc.parallelize([myjson,myotherjson,rawobjectjson]))
>>> spark_read_df.show()
+---+---+
|  a|  b|
+---+---+
|1.0|  1|
|2.0|  2|
|3.0|  3|
|4.0|  4|
+---+---+

>>> spark_read_df.printSchema()
root
 |-- a: double (nullable = true)
 |-- b: long (nullable = true)

Мы видим, чтоmyjsonиmyotherjsonкоторые были массивами JSON объектов JSON, были расширены, чтобы иметь строку для каждого содержащегося в них объекта. Это также гладко обрабатывается, когда одна из строк JSON является просто необработанным объектом. Я думаю, что документации здесь немного не хватает, так как я не смог найти упоминания об этой обработке объектов массива.

Теперь давайте создадим фрейм данных со столбцом строк JSON. Собираюсь броситьrawobjectjsonпотому что, как мы увидим, каждая строка должна иметь одинаковую схему (включая массив верхнего уровня, если он присутствует).

      >>> from pyspark.sql.types import StructType, StructField, StringType, ArrayType
>>> json_string_data = [
...     (myjson,),
...     (myotherjson,),
... ]
>>> json_df_schema = StructType([
...     StructField('json_strings', StringType(), True),
... ])
>>> raw_json_df = spark.createDataFrame(data=json_string_data, schema=json_df_schema)
>>> raw_json_df.show()
+--------------------+
|        json_strings|
+--------------------+
|[{"a": 1.0,"b": 1...|
| [{"a": 3.0,"b": 3}]|
+--------------------+

Теперь вот где я попытался использовать схему, выведеннуюspark.read.jsonперейти к для чтения столбца JSON в объекты, но он продолжал возвращать столбцы, которые были полностью null. Как упомянул Нолан Конауэй, это произойдет, когда схема перейдет кfrom_jsonне может быть применен к заданным строкам.

Проблема в том, что в этих строках он видит верхний уровень как массив, а какspark_read_df.printSchema()показывает, схема, выведеннаяspark.read.json()игнорирует уровень массива.

Решение

Таким образом, решение, к которому я пришел, просто учитывало массив верхнего уровня в схеме при чтении.

      from pyspark.sql import functions as F

# This one won't work for directly passing to from_json as it ignores top-level arrays in json strings
# (if any)!
# json_object_schema = spark_read_df.schema()

# from_json is a bit more "simple", it directly applies the schema to the string. In this case
# the top level type is actually an array, so a simple fix is to just wrap the schema that
# spark.read.json returned in an ArrayType to match the true JSON string
json_array_schema = ArrayType(spark_read_df.schema, True)

json_extracted_df = raw_json_df.select(
    F.from_json('json_strings', json_array_schema)
        .alias('json_arrays')
)
      >>> json_extracted_df.show()
+--------------------+
|         json_arrays|
+--------------------+
|[{1.0, 1}, {2.0, 2}]|
|          [{3.0, 3}]|
+--------------------+

>>> json_extracted_df.printSchema()
root
 |-- json_arrays: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: double (nullable = true)
 |    |    |-- b: long (nullable = true)

Оттуда объекты можно вытащить из массива, используяpyspark.sql.functions.explode:

      >>> exploded_df = json_extracted_df.select(F.explode('json_arrays').alias('objects'))
>>> exploded_df.show()
+--------+
| objects|
+--------+
|{1.0, 1}|
|{2.0, 2}|
|{3.0, 3}|
+--------+

>>> exploded_df.printSchema()
root
 |-- objects: struct (nullable = true)
 |    |-- a: double (nullable = true)
 |    |-- b: long (nullable = true)
Другие вопросы по тегам