Spark, DataFrame: применить преобразователь / оценщик к группам
У меня есть DataFrame, который выглядит следующим образом:
+-----------+-----+------------+
| userID|group| features|
+-----------+-----+------------+
|12462563356| 1| [5.0,43.0]|
|12462563701| 2| [1.0,8.0]|
|12462563701| 1| [2.0,12.0]|
|12462564356| 1| [1.0,1.0]|
|12462565487| 3| [2.0,3.0]|
|12462565698| 2| [1.0,1.0]|
|12462565698| 1| [1.0,1.0]|
|12462566081| 2| [1.0,2.0]|
|12462566081| 1| [1.0,15.0]|
|12462566225| 2| [1.0,1.0]|
|12462566225| 1| [9.0,85.0]|
|12462566526| 2| [1.0,1.0]|
|12462566526| 1| [3.0,79.0]|
|12462567006| 2| [11.0,15.0]|
|12462567006| 1| [10.0,15.0]|
|12462567006| 3| [10.0,15.0]|
|12462586595| 2| [2.0,42.0]|
|12462586595| 3| [2.0,16.0]|
|12462589343| 3| [1.0,1.0]|
+-----------+-----+------------+
Где типы столбцов: userID: Long, group: Int, и features:vector.
Это уже сгруппированный DataFrame, т.е. идентификатор пользователя появится в определенной группе не более одного раза.
Моя цель - масштабировать features
столбец на группу.
Есть ли способ применить преобразователь функций (в моем случае я хотел бы применить StandardScaler) для каждой группы вместо применения его ко всему DataFrame.
PS использование ML не является обязательным, поэтому нет проблем, если решение основано на MLlib.
1 ответ
Вы можете вычислять статистику по группам, используя практически тот же код, что и по умолчанию Scaler
:
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Compute Multivariate Statistics
val summary = data.select($"group", $"features")
.rdd
.map {
case Row(group: Int, features: Vector) => (group, features)
}
.aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
(agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
(agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
.mapValues {
s => (
s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
s.mean.toArray /* fetch the mean for each key */
)
}.collectAsMap
Если ожидаемое количество групп относительно мало, вы можете транслировать это:
val summaryBd = sc.broadcast(summary)
и преобразуйте ваши данные:
val scaledRows = df.map{ case Row(userID, group: Int, features: Vector) =>
val (stdev, mean) = summaryBd.value(group)
val vs = features.toArray.clone()
for (i <- 0 until vs.size) {
vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
}
Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)
В противном случае вы можете просто присоединиться. Не должно быть сложно обернуть это как трансформер ML с групповым столбцом в качестве параметра.