Пример классификации Spark Multiclass
Ребята, вы знаете, где можно найти примеры мультиклассовой классификации в Spark? Я потратил много времени на поиск в книгах и в Интернете, и до сих пор я просто знаю, что это возможно с последней версии в соответствии с документацией.
2 ответа
ML
(Рекомендуется в Spark 2.0+)
Мы будем использовать те же данные, что и в MLlib ниже. Есть два основных варианта. Если Estimator
поддерживает мультиклассовую классификацию из коробки (например, случайный лес), которую вы можете использовать напрямую:
val trainRawDf = trainRaw.toDF
import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
val transformers = Array(
new StringIndexer().setInputCol("group").setOutputCol("label"),
new Tokenizer().setInputCol("text").setOutputCol("tokens"),
new CountVectorizer().setInputCol("tokens").setOutputCol("features")
)
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)
model.transform(trainRawDf)
Если модель поддерживает только двоичную классификацию (логистическая регрессия) и расширяет o.a.s.ml.classification.Classifier
Вы можете использовать стратегию "один против отдыха":
import org.apache.spark.ml.classification.OneVsRest
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val ovr = new OneVsRest().setClassifier(lr)
val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)
MLLib
Согласно официальной документации на данный момент (MLlib 1.6.0) следующие методы поддерживают мультиклассовую классификацию:
- логистическая регрессия,
- деревья решений,
- случайные леса,
- наивный байесовский
По крайней мере, некоторые из примеров используют мультиклассовую классификацию:
- Наивный байесовский пример - 3 класса
- Логистическая регрессия - 10 классов для классификатора, хотя только 2 в данных примера
Общая структура, игнорирующая аргументы, специфичные для метода, почти такая же, как и для всех других методов в MLlib. Вы должны предварительно обработать ваш ввод, чтобы создать любой фрейм данных со столбцами, представляющими label
а также features
:
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
или же RDD[LabeledPoint]
,
Spark предоставляет широкий спектр полезных инструментов, предназначенных для облегчения этого процесса, включая средства извлечения функций и преобразования функций и конвейеры.
Ниже вы найдете довольно наивный пример использования Random Forest.
Сначала позволяет импортировать необходимые пакеты и создавать фиктивные данные:
import sqlContext.implicits._
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
case class LabeledRecord(group: String, text: String)
val trainRaw = sc.parallelize(
LabeledRecord("foo", "foo v a y b foo") ::
LabeledRecord("bar", "x bar y bar v") ::
LabeledRecord("bar", "x a y bar z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foo", "foo x") ::
LabeledRecord("foobar", "z y x foo a b bar v") ::
Nil
)
Теперь давайте определимся с необходимыми трансформаторами и технологическим составом. Dataset
:
// Tokenizer to process text fields
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
// HashingTF to convert tokens to the feature vector
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("features")
.setNumFeatures(10)
// Indexer to convert String labels to Double
val indexer = new StringIndexer()
.setInputCol("group")
.setOutputCol("label")
.fit(trainRaw.toDF)
def transfom(rdd: RDD[LabeledRecord]) = {
val tokenized = tokenizer.transform(rdd.toDF)
val hashed = hashingTF.transform(tokenized)
val indexed = indexer.transform(hashed)
indexed
.select($"label", $"features")
.map{case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)}
}
val train: RDD[LabeledPoint] = transfom(trainRaw)
Обратите внимание, что indexer
"вписывается" в данные поезда. Это просто означает, что категориальные значения, используемые в качестве меток, преобразуются в doubles
, Чтобы использовать классификатор для новых данных, вы должны сначала преобразовать их, используя эту indexer
,
Далее мы можем обучить модель RF:
val numClasses = 3
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 4
val maxBins = 16
val model = RandomForest.trainClassifier(
train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity,
maxDepth, maxBins
)
и, наконец, проверить это:
val testRaw = sc.parallelize(
LabeledRecord("foo", "foo foo z z z") ::
LabeledRecord("bar", "z bar y y v") ::
LabeledRecord("bar", "a a bar a z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foobar", "a foo a bar") ::
Nil
)
val test: RDD[LabeledPoint] = transfom(testRaw)
val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label))
val metrics = new MulticlassMetrics(predsAndLabs)
metrics.precision
metrics.recall
Вы используете Spark 1.6, а не Spark 2.1? Я думаю, что проблема в том, что в spark 2.1 метод transform возвращает набор данных, который может быть неявно преобразован в типизированный RDD, где, как и до этого, он возвращает фрейм или строку данных.
Попробуйте в качестве диагностики, указав тип возврата функции преобразования в виде RDD[LabeledPoint], и посмотрите, не возникает ли такая же ошибка.