StandardScaler в Spark не работает должным образом

Любая идея, почему спарк будет делать это для StandardScaler? Согласно определению StandardScaler:

StandardScaler стандартизирует набор функций, чтобы иметь нулевое среднее значение и стандартное отклонение 1. Флаг withStd будет масштабировать данные до стандартного отклонения, а флаг withMean (по умолчанию false) будет центрировать данные перед масштабированием.

>>> tmpdf.show(4)
+----+----+----+------------+
|int1|int2|int3|temp_feature|
+----+----+----+------------+
|   1|   2|   3|       [2.0]|
|   7|   8|   9|       [8.0]|
|   4|   5|   6|       [5.0]|
+----+----+----+------------+

>>> sScaler = StandardScaler(withMean=True, withStd=True).setInputCol("temp_feature")
>>> sScaler.fit(tmpdf).transform(tmpdf).show()
+----+----+----+------------+-------------------------------------------+
|int1|int2|int3|temp_feature|StandardScaler_4fe08ca180ab163e4120__output|
+----+----+----+------------+-------------------------------------------+
|   1|   2|   3|       [2.0]|                                     [-1.0]|
|   7|   8|   9|       [8.0]|                                      [1.0]|
|   4|   5|   6|       [5.0]|                                      [0.0]|
+----+----+----+------------+-------------------------------------------+

В мире обломков

>>> x
array([2., 8., 5.])
>>> (x - x.mean())/x.std()
array([-1.22474487,  1.22474487,  0.        ])

В склеарн мире

>>> scaler = StandardScaler(with_mean=True, with_std=True)
>>> data
[[2.0], [8.0], [5.0]]
>>> print(scaler.fit(data).transform(data))
[[-1.22474487]
 [ 1.22474487]
 [ 0.        ]]

1 ответ

Решение

Причина того, что ваши результаты не соответствуют ожиданиям, заключается в том, что pyspark.ml.feature.StandardScalerиспользует стандартное отклонение несмещенной выборки вместо стандартного отклонения популяции.

Из документов:

"Единица измерения стандартного отклонения" вычисляется с использованием скорректированного стандартного отклонения выборки, которое вычисляется как квадратный корень несмещенной выборочной дисперсии.

Если бы вы попробовали numpy код с примером стандартного отклонения, вы увидите те же результаты:

import numpy as np

x = np.array([2., 8., 5.])
print((x - x.mean())/x.std(ddof=1))
#array([-1.,  1.,  0.])

С точки зрения моделирования, это почти наверняка не проблема (если только ваши данные не составляют всю совокупность, что практически никогда не бывает). Также имейте в виду, что для больших размеров выборки стандартное отклонение выборки приближается к стандартному отклонению популяции. Поэтому, если в вашем DataFrame много строк, разница здесь будет незначительной.


Однако, если вы настаивали на том, чтобы ваш скейлер использовал стандартное отклонение совокупности, один "хакерский" способ - добавить в ваш DataFrame строку, которая является средним значением столбцов.

Напомним, что стандартное отклонение определяется как квадратный корень из суммы квадратов разностей со средним. Или как функция:

# using the same x as above
def popstd(x): 
    return np.sqrt(sum((xi - x.mean())**2/len(x) for xi in x))

print(popstd(x))
#2.4494897427831779

print(x.std())
#2.4494897427831779

Разница при использовании объективного стандартного отклонения заключается в том, что вы просто делите на len(x)-1 вместо len(x), Поэтому, если вы добавите выборку, равную среднему значению, вы увеличите знаменатель, не влияя на общее среднее значение.

Предположим, у вас был следующий DataFrame:

df = spark.createDataFrame(
    np.array(range(1,10,1)).reshape(3,3).tolist(),
    ["int1", "int2", "int3"]
)
df.show()
#+----+----+----+
#|int1|int2|int3|
#+----+----+----+
#|   1|   2|   3|
#|   4|   5|   6|
#|   7|   8|   9|
#+----+----+----+

Объедините этот DataFrame со средним значением для каждого столбца:

import pyspark.sql.functions as f
# This is equivalent to UNION ALL in SQL
df2 = df.union(df.select(*[f.avg(c).alias(c) for c in df.columns]))

Теперь масштабируйте ваши значения:

from pyspark.ml.feature import VectorAssembler, StandardScaler
va = VectorAssembler(inputCols=["int2"], outputCol="temp_feature")

tmpdf = va.transform(df2)
sScaler = StandardScaler(
    withMean=True, withStd=True, inputCol="temp_feature", outputCol="scaled"
)
sScaler.fit(tmpdf).transform(tmpdf).show()
#+----+----+----+------------+---------------------+
#|int1|int2|int3|temp_feature|scaled               |
#+----+----+----+------------+---------------------+
#|1.0 |2.0 |3.0 |[2.0]       |[-1.2247448713915892]|
#|4.0 |5.0 |6.0 |[5.0]       |[0.0]                |
#|7.0 |8.0 |9.0 |[8.0]       |[1.2247448713915892] |
#|4.0 |5.0 |6.0 |[5.0]       |[0.0]                |
#+----+----+----+------------+---------------------+
Другие вопросы по тегам