Удаление вложенного столбца из Spark DataFrame
У меня есть DataFrame
со схемой
root
|-- label: string (nullable = true)
|-- features: struct (nullable = true)
| |-- feat1: string (nullable = true)
| |-- feat2: string (nullable = true)
| |-- feat3: string (nullable = true)
Хотя я могу отфильтровать фрейм данных с помощью
val data = rawData
.filter( !(rawData("features.feat1") <=> "100") )
Я не могу удалить столбцы, используя
val data = rawData
.drop("features.feat1")
Это то, что я делаю здесь неправильно? Я тоже пытался (безуспешно) делать drop(rawData("features.feat1"))
Хотя это не имеет особого смысла.
Заранее спасибо,
Нихилу
11 ответов
Это всего лишь упражнение по программированию, но вы можете попробовать что-то вроде этого:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
def getSourceField(source: String): Try[StructField] = {
Try(df.schema.fields.filter(_.name == source).head)
}
def getType(sourceField: StructField): Try[StructType] = {
Try(sourceField.dataType.asInstanceOf[StructType])
}
def genOutputCol(names: Array[String], source: String): Column = {
f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
}
def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
getSourceField(source)
.flatMap(getType)
.map(_.fieldNames.diff(toDrop))
.map(genOutputCol(_, source))
.map(df.withColumn(source, _))
.getOrElse(df)
}
}
Пример использования:
scala> case class features(feat1: String, feat2: String, feat3: String)
defined class features
scala> case class record(label: String, features: features)
defined class record
scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
+-------+--------+
| label|features|
+-------+--------+
|a_label| [f2,f3]|
+-------+--------+
scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
+-------+----------+
| label| features|
+-------+----------+
|a_label|[f1,f2,f3]|
+-------+----------+
Добавьте неявное преобразование, и все готово.
Эта версия позволяет удалять вложенные столбцы на любом уровне:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, DataType}
/**
* Various Spark utilities and extensions of DataFrame
*/
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else {
colType match {
case colType: StructType =>
if (dropColName.startsWith(s"${fullColName}.")) {
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
} else {
Some(col)
}
case other => Some(col)
}
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Использование:
import DataFrameUtils._
df.dropNestedColumn("a.b.c.d")
Для Spark 3.1+ вы можете использовать методdropFields
в столбцах типа структуры:
Выражение, которое удаляет поля в StructType по имени. Это не работает, если схема не содержит имен полей.
val df = sql("SELECT named_struct('feat1', 1, 'feat2', 2, 'feat3', 3) features")
val df1 = df.withColumn("features", $"features".dropFields("feat1"))
Расширяю спектом ответ. С поддержкой типов массивов:
object DataFrameUtils {
private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
Some(struct(innerType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
}
case other => Some(col)
}
} else {
Some(col)
}
}
protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
})
.foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
/**
* Extended version of DataFrame that allows to operate on nested fields
*/
implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
/**
* Drops nested field from DataFrame
*
* @param colName Dot-separated nested field name
*/
def dropNestedColumn(colName: String): DataFrame = {
DataFrameUtils.dropColumn(df, colName)
}
}
}
Я подробно остановлюсь на ответе mmendez.semantic здесь и расскажу о проблемах, описанных в подпотоке.
def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
None
} else if (dropColName.startsWith(s"$fullColName.")) {
colType match {
case colType: StructType =>
Some(struct(
colType.fields
.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
})
: _*))
case colType: ArrayType =>
colType.elementType match {
case innerType: StructType =>
// we are potentially dropping a column from within a struct, that is itself inside an array
// Spark has some very strange behavior in this case, which they insist is not a bug
// see https://issues.apache.org/jira/browse/SPARK-31779 and associated comments
// and also the thread here: https://stackru.com/a/39943812/375670
// this is a workaround for that behavior
// first, get all struct fields
val innerFields = innerType.fields
// next, create a new type for all the struct fields EXCEPT the column that is to be dropped
// we will need this later
val preserveNamesStruct = ArrayType(StructType(
innerFields.filterNot(f => s"$fullColName.${f.name}".equals(dropColName))
))
// next, apply dropSubColumn recursively to build up the new values after dropping the column
val filteredInnerFields = innerFields.flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
}
)
// finally, use arrays_zip to unwrap the arrays that were introduced by building up the new. filtered
// struct in this way (see comments in SPARK-31779), and then cast to the StructType we created earlier
// to get the original names back
Some(arrays_zip(filteredInnerFields:_*).cast(preserveNamesStruct))
}
case _ => Some(col)
}
} else {
Some(col)
}
}
def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields.flatMap(f => {
if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
case Some(x) => Some((f.name, x))
case None => None
}
} else {
None
}
}).foldLeft(df.drop(colName)) {
case (df, (colName, column)) => df.withColumn(colName, column)
}
}
Использование в spark-shell
:
// if defining the functions above in your spark-shell session, you first need imports
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// now you can paste the function definitions
// create a deeply nested and complex JSON structure
val jsonData = """{
"foo": "bar",
"top": {
"child1": 5,
"child2": [
{
"child2First": "one",
"child2Second": 2,
"child2Third": -19.51
}
],
"child3": ["foo", "bar", "baz"],
"child4": [
{
"child2First": "two",
"child2Second": 3,
"child2Third": 16.78
}
]
}
}"""
// read it into a DataFrame
val df = spark.read.option("multiline", "true").json(Seq(jsonData).toDS())
// remove a sub-column
val modifiedDf = dropColumn(df, "top.child2.child2First")
modifiedDf.printSchema
root
|-- foo: string (nullable = true)
|-- top: struct (nullable = false)
| |-- child1: long (nullable = true)
| |-- child2: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
| |-- child3: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- child4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child2First: string (nullable = true)
| | | |-- child2Second: long (nullable = true)
| | | |-- child2Third: double (nullable = true)
modifiedDf.show(truncate=false)
+---+------------------------------------------------------+
|foo|top |
+---+------------------------------------------------------+
|bar|[5, [[2, -19.51]], [foo, bar, baz], [[two, 3, 16.78]]]|
+---+------------------------------------------------------+
Другой способ (PySpark) - отказаться от features.feat1
столбец путем создания features
еще раз:
from pyspark.sql.functions import col, arrays_zip
display(df
.withColumn("features", arrays_zip("features.feat2", "features.feat3"))
.withColumn("features", col("features").cast(schema))
)
куда schema
это новая схема (исключая features.feat1
).
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('feat2', StringType(), True),
StructField('feat3', StringType(), True),
]
)
Следуя фрагменту кода spektom для scala, я создал похожий код на Java. Поскольку Java 8 не имеет foldLeft, я использовал forEachOrdered. Этот код подходит для spark 2.x (я использую 2.1). Также я заметил, что удаление столбца и добавление его с помощью withColumn с тем же именем не работает, поэтому я просто заменяю столбец, и кажется, что Работа.
Код не полностью протестирован, надеюсь, он работает:-)
public class DataFrameUtils {
public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
Arrays.stream(dataFrame.schema().fields())
.flatMap( f -> {
if (columnName.startsWith(f.name() + ".")) {
final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
}).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDF();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + ".")) {
column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<Row> df;
public DataFrameFolder(Dataset<Row> df) {
this.df = df;
}
public Dataset<Row> getDF() {
return df;
}
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
Пример использования:
private class Pojo {
private String str;
private Integer number;
private List<String> strList;
private Pojo2 pojo2;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
public Pojo2 getPojo2() {
return pojo2;
}
}
private class Pojo2 {
private String str;
private Integer number;
private List<String> strList;
public String getStr() {
return str;
}
public Integer getNumber() {
return number;
}
public List<String> getStrList() {
return strList;
}
}
SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
Оригинальная структура:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = true)
| |-- number: integer (nullable = true)
| |-- str: string (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
После падения:
root
|-- number: integer (nullable = true)
|-- pojo2: struct (nullable = false)
| |-- number: integer (nullable = true)
| |-- strList: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- str: string (nullable = true)
|-- strList: array (nullable = true)
| |-- element: string (containsNull = true)
Реализация PySpark
import pyspark.sql.functions as sf
def _drop_nested_field(
schema: StructType,
field_to_drop: str,
parents: List[str] = None,
) -> Column:
parents = list() if parents is None else parents
src_col = lambda field_names: sf.col('.'.join(f'`{c}`' for c in field_names))
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in schema.fields if f.name == root)
return sf.struct(
*[src_col(parents + [f.name]) for f in schema.fields if f.name != root],
_drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=parents + [root]
).alias(root)
)
else:
# select all columns except the one to drop
return sf.struct(
*[src_col(parents + [f.name])for f in schema.fields if f.name != field_to_drop],
)
def drop_nested_field(
df: DataFrame,
field_to_drop: str,
) -> DataFrame:
if '.' in field_to_drop:
root, subfield = field_to_drop.split('.', maxsplit=1)
field_to_drop_from = next(f for f in df.schema.fields if f.name == root)
return df.withColumn(root, _drop_nested_field(
schema=field_to_drop_from.dataType,
field_to_drop=subfield,
parents=[root]
))
else:
return df.drop(field_to_drop)
df = drop_nested_field(df, 'a.b.c.d')
Библиотека Make Structs Easy* упрощает выполнение таких операций, как добавление, удаление и переименование полей внутри вложенных структур данных. Библиотека доступна как на Scala, так и на Python.
Предположим, у вас есть следующие данные:
import org.apache.spark.sql.functions._
case class Features(feat1: String, feat2: String, feat3: String)
case class Record(features: Features, arrayOfFeatures: Seq[Features])
val df = Seq(
Record(Features("hello", "world", "!"), Seq(Features("red", "orange", "yellow"), Features("green", "blue", "indigo")))
).toDF
df.printSchema
// root
// |-- features: struct (nullable = true)
// | |-- feat1: string (nullable = true)
// | |-- feat2: string (nullable = true)
// | |-- feat3: string (nullable = true)
// |-- arrayOfFeatures: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- feat1: string (nullable = true)
// | | |-- feat2: string (nullable = true)
// | | |-- feat3: string (nullable = true)
df.show(false)
// +-----------------+----------------------------------------------+
// |features |arrayOfFeatures |
// +-----------------+----------------------------------------------+
// |[hello, world, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +-----------------+----------------------------------------------+
Затем отбрасывая feat2
от features
так же просто, как:
import com.github.fqaiser94.mse.methods._
// drop feat2 from features
df.withColumn("features", $"features".dropFields("feat2")).show(false)
// +----------+----------------------------------------------+
// |features |arrayOfFeatures |
// +----------+----------------------------------------------+
// |[hello, !]|[[red, orange, yellow], [green, blue, indigo]]|
// +----------+----------------------------------------------+
Я заметил, что было много последующих комментариев по другим решениям с вопросом, есть ли способ удалить столбец, вложенный внутри структуры, вложенной внутри массива. Это можно сделать, объединив функции, предоставляемые библиотекой Make Structs Easy, с функциями, предоставляемыми библиотекой spark-hofs, следующим образом:
import za.co.absa.spark.hofs._
// drop feat2 in each element of arrayOfFeatures
df.withColumn("arrayOfFeatures", transform($"arrayOfFeatures", features => features.dropFields("feat2"))).show(false)
// +-----------------+--------------------------------+
// |features |arrayOfFeatures |
// +-----------------+--------------------------------+
// |[hello, world, !]|[[red, yellow], [green, indigo]]|
// +-----------------+--------------------------------+
* Полное раскрытие информации: я являюсь автором библиотеки Make Structs Easy, на которую имеется ссылка в этом ответе.
Со Spark 3.1+, коротко и эффективно:
object DatasetOps {
implicit class DatasetOps[T](val dataset: Dataset[T]) {
def dropFields(fieldNames: String*): DataFrame =
fieldNames.foldLeft(dataset.toDF()) { (dataset, fieldName) =>
val subFieldRegex = "(\\w+)\\.(.+)".r
fieldName match {
case subFieldRegex(columnName, subFieldPath) =>
dataset.withColumn(columnName, col(columnName).dropFields(subFieldPath))
case _ => dataset.drop(fieldName)
}
}
}
}
Это также сохраняет требуемое или не логическое значение в схеме.
Использование:
dataset.dropFields("some_column", "some_struct.some_sub_field.some_field")
Добавление версии java. Решение для этого.
Служебный класс (передайте свой набор данных и вложенный столбец, который нужно отбросить, в функцию dropNestedColumn).
(В ответе Лиора Чаги есть несколько ошибок, я исправил их, пока пытался использовать его ответ).
public class NestedColumnActions {
/*
dataset : dataset in which we want to drop columns
columnName : nested column that needs to be deleted
*/
public static Dataset<?> dropNestedColumn(Dataset<?> dataset, String columnName) {
//Special case of top level column deletion
if(!columnName.contains("."))
return dataset.drop(columnName);
final DataSetModifier dataFrameFolder = new DataSetModifier(dataset);
Arrays.stream(dataset.schema().fields())
.flatMap(f -> {
//If the column name to be deleted starts with current top level column
if (columnName.startsWith(f.name() + DOT)) {
//Get new column structure under f , expected after deleting the required column
final Optional<Column> column = dropSubColumn(functions.col(f.name()), f.dataType(), f.name(), columnName);
if (column.isPresent()) {
return Stream.of(new Tuple2<>(f.name(), column));
} else {
return Stream.empty();
}
} else {
return Stream.empty();
}
})
//Call accept function with Tuples of (top level column name, new column structure under it)
.forEach(colTuple -> dataFrameFolder.accept(colTuple));
return dataFrameFolder.getDataset();
}
private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
Optional<Column> column = Optional.empty();
if (!fullColumnName.equals(dropColumnName)) {
if (colType instanceof StructType) {
if (dropColumnName.startsWith(fullColumnName + DOT)) {
column = Optional.of(functions.struct(getColumns(col, (StructType) colType, fullColumnName, dropColumnName)));
}
else {
column = Optional.of(col);
}
} else {
column = Optional.of(col);
}
}
return column;
}
private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
return Arrays.stream(colType.fields())
.flatMap(f -> {
final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
fullColumnName + "." + f.name(), dropColumnName);
if (column.isPresent()) {
return Stream.of(column.get().alias(f.name()));
} else {
return Stream.empty();
}
}
).toArray(Column[]::new);
}
private static class DataSetModifier implements Consumer<Tuple2<String, Optional<Column>>> {
private Dataset<?> df;
public DataSetModifier(Dataset<?> df) {
this.df = df;
}
public Dataset<?> getDataset() {
return df;
}
/*
colTuple[0]:top level column name
colTuple[1]:new column structure under it
*/
@Override
public void accept(Tuple2<String, Optional<Column>> colTuple) {
if (!colTuple._2().isPresent()) {
df = df.drop(colTuple._1());
} else {
df = df.withColumn(colTuple._1(), colTuple._2().get());
}
}
}
}