Обнаружение аномалий с помощью PCA в Spark

Я прочитал следующую статью

Обнаружение аномалий с помощью анализа главных компонентов (PCA)

В статье написано следующее:

• Алгоритм PCA в основном преобразует показания данных из существующей системы координат в новую систему координат.

• Чем ближе показания данных к центру новой системы координат, тем ближе эти показания к оптимальному значению.

• Оценка аномалии рассчитывается с использованием расстояния Махаланобиса между показаниями и средним значением всех показаний, которое является центром преобразованной системы координат.

Может ли кто-нибудь описать мне более подробно об обнаружении аномалий с помощью PCA (используя оценки PCA и расстояние Махаланобиса)? Я запутался, потому что определение PCA таково: PCA - это статистическая процедура, которая использует ортогональное преобразование для преобразования набора наблюдений возможных коррелированных переменных в набор значений линейно некоррелированных переменных ". Как использовать расстояние Махаланобиса, когда нет больше корреляции между переменными?

Кто-нибудь может объяснить мне, как это сделать в Spark? Функция pca.transform возвращает счет, где я должен вычислить расстояние Махаланобиса для каждого чтения к центру?

1 ответ

Решение

Предположим, у вас есть набор данных из трехмерных точек. Каждая точка имеет координаты (x, y, z), Те (x, y, z) являются размерами. Точка представлена ​​тремя значениями, например (8, 7, 4), Это называется входной вектор.

Применяя алгоритм PCA, вы в основном преобразуете свой входной вектор в новый вектор. Это может быть представлено как функция, которая превращается (x, y, z) => (v, w).

Пример: (8, 7, 4) => (-4, 13)

Теперь вы получили вектор, более короткий (вы уменьшили число измерений), но ваша точка все еще имеет координаты, а именно (v, w), Это означает, что вы можете вычислить расстояние между двумя точками, используя меру Махаланобиса. Точки, которые имеют большое расстояние от средней координаты, на самом деле являются аномалиями.

Пример решения:

import breeze.linalg.{DenseVector, inv}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{PCA, StandardScaler, VectorAssembler}
import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._

object SparkApp extends App {
  val session = SparkSession.builder()
    .appName("spark-app").master("local[*]").getOrCreate()
  session.sparkContext.setLogLevel("ERROR")
  import session.implicits._

  val df = Seq(
    (1, 4, 0),
    (3, 4, 0),
    (1, 3, 0),
    (3, 3, 0),
    (67, 37, 0) //outlier
  ).toDF("x", "y", "z")
  val vectorAssembler = new VectorAssembler().setInputCols(Array("x", "y", "z")).setOutputCol("vector")
  val standardScalar = new StandardScaler().setInputCol("vector").setOutputCol("normalized-vector").setWithMean(true)
    .setWithStd(true)

  val pca = new PCA().setInputCol("normalized-vector").setOutputCol("pca-features").setK(2)

  val pipeline = new Pipeline().setStages(
    Array(vectorAssembler, standardScalar, pca)
  )

  val pcaDF = pipeline.fit(df).transform(df)

  def withMahalanobois(df: DataFrame, inputCol: String): DataFrame = {
    val Row(coeff1: Matrix) = Correlation.corr(df, inputCol).head

    val invCovariance = inv(new breeze.linalg.DenseMatrix(2, 2, coeff1.toArray))

    val mahalanobois = udf[Double, Vector] { v =>
      val vB = DenseVector(v.toArray)
      vB.t * invCovariance * vB
    }

    df.withColumn("mahalanobois", mahalanobois(df(inputCol)))
  }

  val withMahalanobois: DataFrame = withMahalanobois(pcaDF, "pca-features")

  session.close()
}
Другие вопросы по тегам