Обнаружение аномалий с помощью PCA в Spark
Я прочитал следующую статью
Обнаружение аномалий с помощью анализа главных компонентов (PCA)
В статье написано следующее:
• Алгоритм PCA в основном преобразует показания данных из существующей системы координат в новую систему координат.
• Чем ближе показания данных к центру новой системы координат, тем ближе эти показания к оптимальному значению.
• Оценка аномалии рассчитывается с использованием расстояния Махаланобиса между показаниями и средним значением всех показаний, которое является центром преобразованной системы координат.
Может ли кто-нибудь описать мне более подробно об обнаружении аномалий с помощью PCA (используя оценки PCA и расстояние Махаланобиса)? Я запутался, потому что определение PCA таково: PCA - это статистическая процедура, которая использует ортогональное преобразование для преобразования набора наблюдений возможных коррелированных переменных в набор значений линейно некоррелированных переменных ". Как использовать расстояние Махаланобиса, когда нет больше корреляции между переменными?
Кто-нибудь может объяснить мне, как это сделать в Spark? Функция pca.transform возвращает счет, где я должен вычислить расстояние Махаланобиса для каждого чтения к центру?
1 ответ
Предположим, у вас есть набор данных из трехмерных точек. Каждая точка имеет координаты (x, y, z)
, Те (x, y, z)
являются размерами. Точка представлена тремя значениями, например (8, 7, 4)
, Это называется входной вектор.
Применяя алгоритм PCA, вы в основном преобразуете свой входной вектор в новый вектор. Это может быть представлено как функция, которая превращается (x, y, z) => (v, w).
Пример: (8, 7, 4) => (-4, 13)
Теперь вы получили вектор, более короткий (вы уменьшили число измерений), но ваша точка все еще имеет координаты, а именно (v, w)
, Это означает, что вы можете вычислить расстояние между двумя точками, используя меру Махаланобиса. Точки, которые имеют большое расстояние от средней координаты, на самом деле являются аномалиями.
Пример решения:
import breeze.linalg.{DenseVector, inv}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{PCA, StandardScaler, VectorAssembler}
import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
object SparkApp extends App {
val session = SparkSession.builder()
.appName("spark-app").master("local[*]").getOrCreate()
session.sparkContext.setLogLevel("ERROR")
import session.implicits._
val df = Seq(
(1, 4, 0),
(3, 4, 0),
(1, 3, 0),
(3, 3, 0),
(67, 37, 0) //outlier
).toDF("x", "y", "z")
val vectorAssembler = new VectorAssembler().setInputCols(Array("x", "y", "z")).setOutputCol("vector")
val standardScalar = new StandardScaler().setInputCol("vector").setOutputCol("normalized-vector").setWithMean(true)
.setWithStd(true)
val pca = new PCA().setInputCol("normalized-vector").setOutputCol("pca-features").setK(2)
val pipeline = new Pipeline().setStages(
Array(vectorAssembler, standardScalar, pca)
)
val pcaDF = pipeline.fit(df).transform(df)
def withMahalanobois(df: DataFrame, inputCol: String): DataFrame = {
val Row(coeff1: Matrix) = Correlation.corr(df, inputCol).head
val invCovariance = inv(new breeze.linalg.DenseMatrix(2, 2, coeff1.toArray))
val mahalanobois = udf[Double, Vector] { v =>
val vB = DenseVector(v.toArray)
vB.t * invCovariance * vB
}
df.withColumn("mahalanobois", mahalanobois(df(inputCol)))
}
val withMahalanobois: DataFrame = withMahalanobois(pcaDF, "pca-features")
session.close()
}