Нет ли метода "обратного преобразования" для такого скейлера, как MinMaxScaler в spark?
При обучении модели, скажем, линейной регрессии, мы можем сделать нормализацию, такую как MinMaxScaler, для поезда набором тестовых данных.
После того, как мы получили обученную модель и используем ее для составления прогнозов и масштабирования прогнозов до исходного представления.
В питоне есть метод "обратного преобразования". Например:
from sklearn.preprocessing import MinMaxScaler
scalerModel.inverse_transform
from sklearn.preprocessing import MinMaxScaler
data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]
scaler = MinMaxScaler()
MinMaxScaler(copy=True, feature_range=(0, 1))
print(data)
dataScaled = scaler.fit(data).transform(data)
print(dataScaled)
scaler.inverse_transform(dataScaled)
Есть ли подобный метод в искре?
Я много гуглил, но ответа не нашел. Кто-нибудь может дать мне несколько советов? Большое спасибо!
1 ответ
В нашей компании, чтобы решить ту же проблему на StandardScaler, мы расширили spark.ml следующим образом:
package org.apache.spark.ml
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.util.Identifiable
package object feature {
implicit class RichStandardScalerModel(model: StandardScalerModel) {
private def invertedStdDev(sigma: Double): Double = 1 / sigma
private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma
def inverse(newOutputCol: String): StandardScalerModel = {
val sigma: linalg.Vector = model.std
val mu: linalg.Vector = model.mean
val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))
val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })
val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)
.setInputCol(model.getOutputCol)
.setOutputCol(newOutputCol)
inverted
.set(inverted.withMean, model.getWithMean)
.set(inverted.withStd, model.getWithStd)
}
}
}
Это должно быть довольно легко изменить или сделать что-то подобное для вашего конкретного случая.
Имейте в виду, что из-за двойной реализации JVM вы обычно теряете точность в этих операциях, поэтому вы не восстановите точные исходные значения, которые вы имели до преобразования (например: вы, вероятно, получите что-то вроде 1.9999999999999998 вместо 2.0).
Здесь нет прямого решения. Поскольку передача массива в UDF может быть выполнена только тогда, когда массив является столбцом (горит (массив) не поможет), я использую следующий обходной путь.
Одним словом, он превращает массив перевернутых весов в строку, передает его в UDF и решает математику.
Вы можете использовать этот масштабированный массив (строку) в обратной функции (также прикрепленной здесь), чтобы получить инвертированные значения.
Код:
from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer
from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT
df = spark.createDataFrame([
(0, 1, 0.5, -1),
(1, 2, 1.0, 1),
(2, 4, 10.0, 2)
], ["id", 'x1', 'x2', 'x3'])
df.show()
def Normalize(df):
scales = df.describe()
scales = scales.filter("summary = 'mean' or summary = 'stddev'")
scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])
assembler = VectorAssembler(
inputCols=scales.columns[1:],
outputCol="X_scales")
df_scales = assembler.transform(scales)
x_mean = df_scales.filter("summary = 'mean'").select('X_scales')
x_std = df_scales.filter("summary = 'stddev'").select('X_scales')
ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))
ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))
assembler = VectorAssembler(
inputCols=df.columns[0:4],
outputCol="features")
df_features = assembler.transform(df)
df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))
return df_features, ks_mean_lit, ks_std_lit
def exec_norm(vector, x_mean, x_std):
x_mean = [float(s) for s in x_mean.split('|')]
x_std = [float(s) for s in x_std.split('|')]
res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)
res = list(res)
return Vectors.dense(res)
exec_norm_udf = udf(exec_norm, VectorUDT())
def scaler_invert(vector, x_mean, x_std):
x_mean = [float(s) for s in x_mean.split('|')]
x_std = [float(s) for s in x_std.split('|')]
res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)
res = list(res)
return Vectors.dense(res)
scaler_invert_udf = udf(scaler_invert, VectorUDT())
df, scaler_mean, scaler_std = Normalize(df)
df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)
Возможно, я слишком опоздал на вечеринку, однако недавно столкнулся с той же проблемой и не смог найти никакого жизнеспособного решения.
Предполагая, что автору этого вопроса не нужно инвертировать значения MinMax векторов, вместо этого необходимо инвертировать только один столбец. Также известны минимальные и максимальные значения столбца, а также минимальные и максимальные параметры масштабатора.
Математика MinMaxScaler согласно сайту scikit learn:
X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
X_scaled = X_std * (max - min) + min
Формула MinMaxScaler "обратного проектирования"
X_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min
X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)
Реализация
from sklearn.preprocessing import MinMaxScaler
import pandas
data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]
scaler = MinMaxScaler(copy=True, feature_range=(0, 1))
print(data)
dataScaled = scaler.fit(data).transform(data)
data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x", "y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled", "y_scaled"])))
data_sp.show()
print("Inversing column: y_scaled")
Xmax = data_sp.select("y").rdd.max()[0]
Xmin = data_sp.select("y").rdd.min()[0]
_max = scaler.feature_range[1]
_min = scaler.feature_range[0]
print("Xmax =", Xmax, "Xmin =", Xmin, "max =", _max, "min =", _min)
data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()
Выходы
[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]
+----+---+--------+--------+
| x| y|x_scaled|y_scaled|
+----+---+--------+--------+
|-1.0| 2| 0.0| 0.0|
|-0.5| 6| 0.25| 0.25|
| 0.0| 10| 0.5| 0.5|
| 1.0| 18| 1.0| 1.0|
+----+---+--------+--------+
Inversing column: y_scaled
Xmax = 18 Xmin = 2 max = 1 min = 0
+----+---+--------+--------+-----------------+
| x| y|x_scaled|y_scaled|y_scaled_inversed|
+----+---+--------+--------+-----------------+
|-1.0| 2| 0.0| 0.0| 2.0|
|-0.5| 6| 0.25| 0.25| 6.0|
| 0.0| 10| 0.5| 0.5| 10.0|
| 1.0| 18| 1.0| 1.0| 18.0|
+----+---+--------+--------+-----------------+