Удаление вложенного столбца Dataframe с помощью PySpark
Я пытаюсь удалить несколько вложенных столбцов в кадре данных Spark с помощью pyspark. Я нашел это для Scala, которое, кажется, делает именно то, что я хочу, но я не знаком со Scala и не знаю, как написать это на Python.
/questions/12456135/udalenie-vlozhennogo-stolbtsa-iz-spark-dataframe/12456147#12456147
Я был бы очень признателен за помощь.
Спасибо,
6 ответов
Теперь мы можем сделать это изначально с помощью Spark версии> = 3.0.
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.dropFields.html
Версия Pyspark:
def drop_col(df, col_nm, delete_col_nm):
fields_to_keep = filter(lambda x: x != delete_col_nm, df.select(" {}.*".format(col_nm)).columns)
fields_to_keep = list(map(lambda x: "{}.{}".format(col_nm, x), fields_to_keep))
return df.withColumn(col_nm, struct(fields_to_keep))
Метод, который я нашел с помощью pyspark, заключается в том, чтобы сначала преобразовать вложенный столбец в json, а затем проанализировать преобразованный json с новой вложенной схемой с отфильтрованными нежелательными столбцами.
Предположим, у меня есть следующая схема, и я хочу отбросить d
а также e
(a.b.d
, a.e
) из кадра данных:
root
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c: long (nullable = true)
| | |-- d: string (nullable = true)
| |-- e: struct (nullable = true)
| | |-- f: long (nullable = true)
| | |-- g: string (nullable = true)
|-- h: string (nullable = true)
Я использовал следующий подход:
Создать новую схему для
a
исключаяd
а такжеe
, Быстрый способ сделать это - вручную выбрать нужные поля.df.select("a").schema
и создайте новую схему из выбранных полей, используяStructType
, Или вы можете сделать это программным путем, обходя дерево схемы и исключая ненужные поля, например:def exclude_nested_field(schema, unwanted_fields, parent=""): new_schema = [] for field in schema: full_field_name = field.name if parent: full_field_name = parent + "." + full_field_name if full_field_name not in unwanted_fields: if isinstance(field.dataType, StructType): inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name) new_schema.append(StructField(field.name, inner_schema)) else: new_schema.append(StructField(field.name, field.dataType)) return StructType(new_schema) new_schema = exclude_nested_field(df.select("a").schema, ["a.b.d", "a.e"])
Перерабатывать
a
столбец в JSON:F.to_json("a")
- Разобрать json-конвертированный
a
столбец из шага 2 с новой схемой, найденной в шаге 1:F.from_json("a_json", new_schema)
Имея приведенный ниже кадр данных, цель состоит в том, чтобы удалить и .
from pyspark.sql import functions as F
df = spark.createDataFrame([], "a struct<b:struct<c:bigint,d:string>,e:struct<f:bigint,g:string>,h:array<struct<i:string,j:string>>>, k string")
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | | |-- d: string (nullable = true) # <<--- to be dropped
# | |-- e: struct (nullable = true) # <<--- to be dropped
# | | |-- f: long (nullable = true)
# | | |-- g: string (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true) # <<--- to be dropped
# |-- k: string (nullable = true)
e
самый простой:
df = df.withColumn("a", F.col("a").dropFields("e"))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | | |-- d: string (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true)
# |-- k: string (nullable = true)
Для того, чтобы броситьd
, мы должны войти внутрьb
:
df = df.withColumn("a", F.col("a").withField("b", F.col("a.b").dropFields("d")))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# | | | |-- j: string (nullable = true)
# |-- k: string (nullable = true)
j
находится внутри массива, поэтомуtransform
также необходимо использовать. Он «зацикливается» на каждом элементе массива (в данном случае элемент является структурой) и преобразует его (удаляет поле).
df = df.withColumn("a", F.col("a").withField(
"h",
F.transform(
F.col("a.h"),
lambda x: x.dropFields("j")
)
))
df.printSchema()
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- c: long (nullable = true)
# | |-- h: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- i: string (nullable = true)
# |-- k: string (nullable = true)
Хотя у меня нет решения для PySpark, возможно, проще перевести это на python. Рассмотрим фрейм данных df
со схемой:
root
|-- employee: struct (nullable = false)
| |-- name: string (nullable = false)
| |-- age: integer (nullable = false)
Тогда, если вы хотите, например, уронить name
, ты можешь сделать:
val fieldsToKeep = df.select($"employee.*").columns
.filter(_!="name") // the nested column you want to drop
.map(n => "employee."+n)
// overwite column with subset of fields
df
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*))
Версия Pyspark ответа Raphaels Scala.
Он работает на определенной глубине, отбрасывает все, что выше этой глубины, и фильтрует строку ниже.
def remove_columns(df,root):
from pyspark.sql.functions import col
cols = df.select(root).columns
fields_filter = filter(lambda x: x[0]!= "$", cols) # use your own lambda here.
fieldsToKeep = list(map(lambda x: root[:-1] + x, fields_filter))
return df.select(fieldsToKeep)
df = remove_metadata(raw_df, root="level1.level2.*")