Spark Scala: Как преобразовать Dataframe[вектор] в DataFrame[f1:Double, ..., fn: Double)]

Я просто использовал Standard Scaler для нормализации своих функций для приложения ML. После выбора масштабированных объектов я хочу преобразовать это обратно в массив данных Double, хотя длина моих векторов произвольна. Я знаю, как сделать это для конкретных 3 функций с помощью

myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")

но не для произвольного количества функций. Есть простой способ сделать это?

Пример:

val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double] 

РЕДАКТИРОВАТЬ

Я узнал, как распаковывать имена столбцов при создании кадра данных, но все еще возникают проблемы с преобразованием вектора в последовательность, необходимую для создания кадра данных:

finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)

2 ответа

Решение

Одним из возможных подходов является что-то похожее на это

import org.apache.spark.sql.functions.udf
import org.apache.spark.mllib.linalg.Vector

// Get size of the vector
val n = testDF.first.getAs[org.apache.spark.mllib.linalg.Vector](0).size

// Simple helper to convert vector to array<double> 
val vecToSeq = udf((v: Vector) => v.toArray)

// Prepare a list of columns to create
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))

testDF.select(vecToSeq($"scaledFeatures").alias("_tmp")).select(exprs:_*)

Если вы знаете список столбцов заранее, вы можете немного упростить это:

val cols: Seq[String] = ???
val exprs = cols.zipWithIndex.map{ case (c, i) => $"_tmp".getItem(i).alias(c) }

Эквивалент Python см. В разделе Как разбить вектор на столбцы с помощью PySpark.

Попробуйте VectorSlicer:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((1, 0.2, 0.8), (2, 0.1, 0.9), (3, 0.3, 0.7))
).toDF("id", "negative_logit", "positive_logit")


val assembler = new VectorAssembler()
  .setInputCols(Array("negative_logit", "positive_logit"))
  .setOutputCol("prediction")

val output = assembler.transform(dataset)
output.show()
/*
+---+--------------+--------------+----------+
| id|negative_logit|positive_logit|prediction|
+---+--------------+--------------+----------+
|  1|           0.2|           0.8| [0.2,0.8]|
|  2|           0.1|           0.9| [0.1,0.9]|
|  3|           0.3|           0.7| [0.3,0.7]|
+---+--------------+--------------+----------+
*/

val slicer = new VectorSlicer()
.setInputCol("prediction")
.setIndices(Array(1))
.setOutputCol("positive_prediction")

val posi_output = slicer.transform(output)
posi_output.show()

/*
+---+--------------+--------------+----------+-------------------+
| id|negative_logit|positive_logit|prediction|positive_prediction|
+---+--------------+--------------+----------+-------------------+
|  1|           0.2|           0.8| [0.2,0.8]|              [0.8]|
|  2|           0.1|           0.9| [0.1,0.9]|              [0.9]|
|  3|           0.3|           0.7| [0.3,0.7]|              [0.7]|
+---+--------------+--------------+----------+-------------------+
*/

Альтернативное решение, появившееся пару дней назад: импортируйте VectorDisassembler в ваш проект (если он не объединен с Spark), теперь:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 1.2, 1.3), (1, 2.2, 2.3), (2, 3.2, 3.3))
).toDF("id", "val1", "val2")


val assembler = new VectorAssembler()
  .setInputCols(Array("val1", "val2"))
  .setOutputCol("vectorCol")

val output = assembler.transform(dataset)
output.show()
/*
+---+----+----+---------+
| id|val1|val2|vectorCol|
+---+----+----+---------+
|  0| 1.2| 1.3|[1.2,1.3]|
|  1| 2.2| 2.3|[2.2,2.3]|
|  2| 3.2| 3.3|[3.2,3.3]|
+---+----+----+---------+*/

val disassembler = new org.apache.spark.ml.feature.VectorDisassembler()
  .setInputCol("vectorCol")
disassembler.transform(output).show()
/*
+---+----+----+---------+----+----+
| id|val1|val2|vectorCol|val1|val2|
+---+----+----+---------+----+----+
|  0| 1.2| 1.3|[1.2,1.3]| 1.2| 1.3|
|  1| 2.2| 2.3|[2.2,2.3]| 2.2| 2.3|
|  2| 3.2| 3.3|[3.2,3.3]| 3.2| 3.3|
+---+----+----+---------+----+----+*/

Я использую Spark 2.3.2 и построил модель бинарной классификации xgboost4j, результат выглядит так :

results_train.select("classIndex","probability","prediction").show(3,0)
+----------+----------------------------------------+----------+
|classIndex|probability                             |prediction|
+----------+----------------------------------------+----------+
|1         |[0.5998525619506836,0.400147408246994]  |0.0       |
|1         |[0.5487841367721558,0.45121586322784424]|0.0       |
|0         |[0.5555324554443359,0.44446757435798645]|0.0       |

Я определяю следующий udf, чтобы получить элементы из вероятности векторного столбца

import org.apache.spark.sql.functions._

def getProb = udf((probV: org.apache.spark.ml.linalg.Vector, clsInx: Int) => probV.apply(clsInx) )

results_train.select("classIndex","probability","prediction").
withColumn("p_0",getProb($"probability",lit(0))).
withColumn("p_1",getProb($"probability", lit(1))).show(3,0)

+----------+----------------------------------------+----------+------------------+-------------------+
|classIndex|probability                             |prediction|p_0               |p_1                |
+----------+----------------------------------------+----------+------------------+-------------------+
|1         |[0.5998525619506836,0.400147408246994]  |0.0       |0.5998525619506836|0.400147408246994  |
|1         |[0.5487841367721558,0.45121586322784424]|0.0       |0.5487841367721558|0.45121586322784424|
|0         |[0.5555324554443359,0.44446757435798645]|0.0       |0.5555324554443359|0.44446757435798645|

Надеюсь, это поможет тем, кто работает с вводом типа Vector.

Поскольку для приведенных выше ответов требуются дополнительные библиотеки или они все еще не поддерживаются, я использовал фрейм данных pandas, чтобы легко извлечь векторные значения, а затем преобразовать их обратно в искровой фрейм данных.

# convert to pandas dataframe 
pandasDf = dataframe.toPandas()
# add a new column
pandasDf['newColumnName'] = 0 # filled the new column with 0s
# now iterate through the rows and update the column
for index, row in pandasDf.iterrows():
   value = row['vectorCol'][0] # get the 0th value of the vector
   pandasDf.loc[index, 'newColumnName'] = value # put the value in the new column
Другие вопросы по тегам