Pyspark: как преобразовать строки json в столбец данных

Ниже приведен более или менее прямой код Python, который функционально извлекается именно так, как я хочу. Схема данных для столбца, который я отфильтровываю внутри фрейма данных, в основном представляет собой строку json.

Однако для этого мне пришлось значительно увеличить требования к памяти, и я работаю только на одном узле. Использование сбора, вероятно, плохо, и создание всего этого на одном узле на самом деле не использует преимущества распределенной природы Spark.

Я бы хотел более ориентированное на Spark решение. Может ли кто-нибудь помочь мне поменять логику ниже, чтобы лучше воспользоваться Spark? Кроме того, в качестве учебного материала: пожалуйста, предоставьте объяснение, почему / как обновления делают его лучше.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json

from pyspark.sql.types import SchemaStruct, SchemaField, StringType


input_schema = SchemaStruct([
    SchemaField('scrubbed_col_name', StringType(), nullable=True)
])


output_schema = SchemaStruct([
    SchemaField('val01_field_name', StringType(), nullable=True),
    SchemaField('val02_field_name', StringType(), nullable=True)
])


example_input = [
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_a"},
        {"val01_field_name": "val01_a", "val02_field_name": "val02_b"},
        {"val01_field_name": "val01_b", "val02_field_name": "val02_c"}]''',
    '''[{"val01_field_name": "val01_c", "val02_field_name": "val02_a"}]''',
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_d"}]''',
]

desired_output = {
    'val01_a': ['val_02_a', 'val_02_b', 'val_02_d'],
    'val01_b': ['val_02_c'],
    'val01_c': ['val_02_a'],
}


def capture(dataframe):
    # Capture column from data frame if it's not empty
    data = dataframe.filter('scrubbed_col_name != null')\
                    .select('scrubbed_col_name')\
                    .rdd\
                    .collect()

    # Create a mapping of val1: list(val2)
    mapping = {}
    # For every row in the rdd
    for row in data:
        # For each json_string within the row
        for json_string in row:
            # For each item within the json string
            for val in json.loads(json_string):
                # Extract the data properly
                val01 = val.get('val01_field_name')
                val02 = val.get('val02_field_name')
                if val02 not in mapping.get(val01, []):
                    mapping.setdefault(val01, []).append(val02)
    return mapping

1 ответ

Решение

Одно из возможных решений:

(df
  .rdd  # Convert to rdd
  .flatMap(lambda x: x)  # Flatten rows
  # Parse JSON. In practice you should add proper exception handling
  .flatMap(lambda x: json.loads(x))
  # Get values
  .map(lambda x: (x.get('val01_field_name'), x.get('val02_field_name')))
  # Convert to final shape
  .groupByKey())

При заданной спецификации вывода эта операция не совсем эффективна (вам действительно нужны сгруппированные значения?), Но все же намного лучше, чем collect,

Другие вопросы по тегам