Как изменить типы столбцов в DataFrame Spark SQL?
Предположим, я делаю что-то вроде:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
но я действительно хотел year
как Int
(и, возможно, преобразовать некоторые другие столбцы).
Лучшее, что я мог придумать, это
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
что немного запутано.
Я из R, и я привык писать, например
df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)
Я, вероятно, что-то упускаю, так как должен быть лучший способ сделать это в spark/scala...
16 ответов
Начиная с версии Spark 1.4 вы можете применить метод приведения с DataType к столбцу:
import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")
Если вы используете выражения SQL, вы также можете сделать:
val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")
Для получения дополнительной информации проверьте документы: http://spark.apache.org/docs/1.6.0/api/scala/
[EDIT: март 2016 года: спасибо за голоса! Хотя на самом деле это не самый лучший ответ, я думаю, что решения, основанные на withColumn
, withColumnRenamed
а также cast
выдвинутые msemelman, Martin Senne и др. проще и чище].
Я думаю, что ваш подход в порядке, напомним, что Spark DataFrame
является (неизменяемым) RDD Rows, поэтому мы никогда не заменяем столбец, а просто создаем новый DataFrame
каждый раз с новой схемой.
Предполагая, что у вас есть оригинальный df со следующей схемой:
scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)
И некоторые UDF определены в одном или нескольких столбцах:
import org.apache.spark.sql.functions._
val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)
Изменение типов столбцов или даже создание нового DataFrame из другого можно записать так:
val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour", toHour(df("CRSDepTime")))
.withColumn("dayOfWeek", toInt(df("DayOfWeek")))
.withColumn("dayOfMonth", toInt(df("DayofMonth")))
.withColumn("month", toInt(df("Month")))
.withColumn("distance", toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")
который дает:
scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)
Это довольно близко к вашему собственному решению. Просто сохраняя изменения типа и другие преобразования как отдельные udf val
делает код более читабельным и многократно используемым.
Как cast
операция доступна для Spark Column
х (а как лично я не одобряю udf
как предложено @Svend
на данный момент), как насчет:
df.select( df("year").cast(IntegerType).as("year"), ... )
привести к запрошенному типу? В качестве аккуратного побочного эффекта, значения, не подлежащие преобразованию / преобразованию в этом смысле, станут null
,
Если вам нужно это как вспомогательный метод, используйте:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
который используется как:
import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Во-первых, если вы хотите сыграть тип
import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
С тем же именем столбца столбец будет заменен новым, вам не нужно добавлять и удалять.
Во-вторых, о Scala vs R. код Scala, наиболее похожий на R, который я могу достичь:
val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other => df(other)
}: _*
)
Хотя длина немного длиннее, чем у R. Обратите внимание, что mutate
это функция для фрейма данных R, поэтому Scala достаточно хорош для выразительной мощности, передаваемой без использования специальной функции.
(df.columns
на удивление Array[String] вместо Array[Column], возможно, они хотят, чтобы он выглядел как датафрейм Python-панд.)
Ты можешь использовать selectExpr
чтобы сделать его немного чище:
df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")
Java-код для изменения типа данных DataFrame с String на Integer
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
Он просто приведёт существующий (тип данных String) к Integer.
Я думаю, что для меня это намного удобнее.
import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))
Это преобразует ваш столбец года в IntegerType
с созданием любых временных столбцов и удалением этих столбцов. Если вы хотите преобразовать в любой другой тип данных, вы можете проверить типы внутриorg.apache.spark.sql.types
пакет.
Чтобы преобразовать год из строки в int, вы можете добавить следующую опцию в программу чтения csv: "inferSchema" -> "true", см. Документацию DataBricks.
Генерация простого набора данных, содержащего пять значений, и преобразование int
в string
тип:
val df = spark.range(5).select( col("id").cast("string") )
Так что это действительно работает, только если у вас возникли проблемы с сохранением в драйвер jdbc, такой как sqlserver, но это действительно полезно для ошибок, с которыми вы столкнетесь с синтаксисом и типами.
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
}
}
JdbcDialects.registerDialect(SQLServerDialect)
Ответы, предлагающие использовать cast, FYI, метод cast в spark 1.4.1 не работает.
например, кадр данных со строковым столбцом, имеющим значение "8182175552014127960" при приведении к значению bigint, имеет значение "8182175552014128100"
df.show
+-------------------+
| a|
+-------------------+
|8182175552014127960|
+-------------------+
df.selectExpr("cast(a as bigint) a").show
+-------------------+
| a|
+-------------------+
|8182175552014128100|
+-------------------+
Нам пришлось столкнуться с множеством проблем, прежде чем найти эту ошибку, потому что у нас были колонки bigint в производстве.
Вы можете использовать приведенный ниже код.
df.withColumn("year", df("year").cast(IntegerType))
Который будет конвертировать столбец года в IntegerType
колонка.
Используя Spark Sql 2.4.0, вы можете сделать это:
spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
Another solution is as follows:
1) Keep "inferSchema" as False
2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)
<Code>
//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema","false")
.load(args[0]);
JavaRDD<Box> vertices = enginesDataSet
.select("BOX","BOX_CD")
.toJavaRDD()
.map(new Function<Row, Box>() {
@Override
public Box call(Row row) throws Exception {
return new Box((String)row.getString(0),(String)row.get(1));
}
});
</Code>
Почему бы просто не сделать, как описано в http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
df.select(df.year.cast("int"),"make","model","comment","blank")
Так много ответов и не так много подробных объяснений
Следующий синтаксис работает с использованием записной книжки Databricks со Spark 2.4
from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))
Обратите внимание, что вы должны указать формат записи, который у вас есть (в моем случае "MM-dd-yyyy"), и импорт является обязательным, поскольку to_date - это искровая функция sql
Также пробовал этот синтаксис, но вместо правильного приведения получил нули:
df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))
(Обратите внимание, что мне пришлось использовать скобки и кавычки, чтобы это было синтаксически правильным)
PS: Я должен признать, что это похоже на синтаксические джунгли, есть много возможных способов точек входа, а в официальных ссылках на API отсутствуют подходящие примеры.
Этот метод удалит старый столбец и создаст новые столбцы с такими же значениями и новым типом данных. Мои оригинальные типы данных при создании DataFrame были:-
root
|-- id: integer (nullable = true)
|-- flag1: string (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag3: string (nullable = true)
После этого я запустил следующий код, чтобы изменить тип данных:-
df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
После этого мой результат оказался:
root
|-- id: integer (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag1: boolean (nullable = true)
|-- flag3: boolean (nullable = true)
По-другому:
// Generate a simple dataset containing five values and convert int to string type
val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
Можно изменить тип данных столбца, используя приведение в spark sql. имя таблицы - таблица, и она имеет только два столбца: столбец1 и столбец2 и тип данных столбца1 должны быть изменены. ex-spark.sql("выберите приведение (column1 как Double) column1NewName,column2 из таблицы") Вместо двойного запишите свой тип данных.
Если вам нужно переименовать десятки столбцов, заданных их именами, в следующем примере используется подход @dnlbrky и он применяется к нескольким столбцам одновременно:
df.selectExpr(df.columns.map(cn => {
if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
else cn
}):_*)
Неклассированные столбцы остаются без изменений. Все столбцы остаются в исходном порядке.
В случае, если вы хотите заменить несколько столбцов одного типа на другой без указания имен отдельных столбцов
/* Get names of all columns that you want to change type.
In this example I want to change all columns of type Array to String*/
val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)
//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}
//display
updatedDataFrame.show(truncate = false)
val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
//Schema to be applied to the table
val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)
val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()