Удаление вложенного столбца Dataframe с помощью PySpark

Я пытаюсь удалить несколько вложенных столбцов в кадре данных Spark с помощью pyspark. Я нашел это для Scala, которое, кажется, делает именно то, что я хочу, но я не знаком со Scala и не знаю, как написать это на Python.

/questions/12456135/udalenie-vlozhennogo-stolbtsa-iz-spark-dataframe/12456147#12456147

Я был бы очень признателен за помощь.

Спасибо,

6 ответов

Теперь мы можем сделать это изначально с помощью Spark версии> = 3.0.

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html

Версия Pyspark:

def drop_col(df, col_nm, delete_col_nm):
    fields_to_keep = filter(lambda x:  x != delete_col_nm, df.select(" {}.*".format(col_nm)).columns)
    fields_to_keep = list(map(lambda x:  "{}.{}".format(col_nm, x), fields_to_keep))
    return df.withColumn(col_nm, struct(fields_to_keep))

Метод, который я нашел с помощью pyspark, заключается в том, чтобы сначала преобразовать вложенный столбец в json, а затем проанализировать преобразованный json с новой вложенной схемой с отфильтрованными нежелательными столбцами.

Предположим, у меня есть следующая схема, и я хочу отбросить d а также e (a.b.d, a.e) из кадра данных:

root
 |-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- c: long (nullable = true)
 |    |    |-- d: string (nullable = true)
 |    |-- e: struct (nullable = true)
 |    |    |-- f: long (nullable = true)
 |    |    |-- g: string (nullable = true)
 |-- h: string (nullable = true)

Я использовал следующий подход:

  1. Создать новую схему для a исключая d а также e, Быстрый способ сделать это - вручную выбрать нужные поля. df.select("a").schema и создайте новую схему из выбранных полей, используя StructType, Или вы можете сделать это программным путем, обходя дерево схемы и исключая ненужные поля, например:

    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.select("a").schema, ["a.b.d", "a.e"])
    
  2. Перерабатывать a столбец в JSON: F.to_json("a")

  3. Разобрать json-конвертированный a столбец из шага 2 с новой схемой, найденной в шаге 1: F.from_json("a_json", new_schema)

Имея приведенный ниже кадр данных, цель состоит в том, чтобы удалить и .

      from pyspark.sql import functions as F
df = spark.createDataFrame([], "a struct<b:struct<c:bigint,d:string>,e:struct<f:bigint,g:string>,h:array<struct<i:string,j:string>>>, k string")
df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)      # <<--- to be dropped
#  |    |-- e: struct (nullable = true)           # <<--- to be dropped
#  |    |    |-- f: long (nullable = true)
#  |    |    |-- g: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)  # <<--- to be dropped
#  |-- k: string (nullable = true)

eсамый простой:

      df = df.withColumn("a", F.col("a").dropFields("e"))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |    |-- d: string (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

Для того, чтобы броситьd, мы должны войти внутрьb:

      df = df.withColumn("a", F.col("a").withField("b", F.col("a.b").dropFields("d")))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |    |    |    |-- j: string (nullable = true)
#  |-- k: string (nullable = true)

jнаходится внутри массива, поэтомуtransformтакже необходимо использовать. Он «зацикливается» на каждом элементе массива (в данном случае элемент является структурой) и преобразует его (удаляет поле).

      df = df.withColumn("a", F.col("a").withField(
    "h",
    F.transform(
        F.col("a.h"),
        lambda x: x.dropFields("j")
    )
))

df.printSchema()
# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- c: long (nullable = true)
#  |    |-- h: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- i: string (nullable = true)
#  |-- k: string (nullable = true)

Хотя у меня нет решения для PySpark, возможно, проще перевести это на python. Рассмотрим фрейм данных df со схемой:

root
 |-- employee: struct (nullable = false)
 |    |-- name: string (nullable = false)
 |    |-- age: integer (nullable = false)

Тогда, если вы хотите, например, уронить name, ты можешь сделать:

val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)

// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*)) 

Версия Pyspark ответа Raphaels Scala.

Он работает на определенной глубине, отбрасывает все, что выше этой глубины, и фильтрует строку ниже.

def remove_columns(df,root):
  from pyspark.sql.functions import col
  cols = df.select(root).columns
  fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here. 
  fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter)) 
  return df.select(fieldsToKeep)

df = remove_metadata(raw_df, root="level1.level2.*")
Другие вопросы по тегам