OutOfBoundsException с ALS - Flink MLlib
Я делаю систему рекомендаций для фильмов, используя наборы данных MovieLens, доступные здесь: http://grouplens.org/datasets/movielens/
Чтобы вычислить эту систему рекомендаций, я использую библиотеку ML Flink в Scala и, в частности, алгоритм ALS (org.apache.flink.ml.recommendation.ALS
).
Я сначала сопоставляю рейтинги фильма в DataSet[(Int, Int, Double)]
а затем создать trainingSet
и testSet
(см. код ниже).
Моя проблема в том, что при использовании ALS.fit
работать со всем набором данных (со всеми оценками), но если я удаляю только одну оценку, функция подгонки больше не работает, и я не понимаю, почему.
У Вас есть какие-либо идеи?:)
Используемый код:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
"Но если я просто уберу только один рейтинг"
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
Ошибка:
19.06.2015 15:00:24 CoGroup (CoGroup на org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) переключен на FAILED
java.lang.ArrayIndexOutOfBoundsException: 5
в org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)
в org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)
в org.apache.flink.runtime.operators.CoGroupDriver.run (CoGroupDriver.java:152)
...
1 ответ
Проблема заключается в first
оператор в сочетании с setTemporaryPath
параметр Флинка ALS
реализация. Чтобы понять проблему, позвольте мне быстро объяснить, как работает алгоритм блокировки ALS.
Реализация блокировки чередующихся наименьших квадратов сначала разбивает заданную матрицу оценок по пользователям и по элементам на блоки. Для этих блоков рассчитывается информация о маршруте. Эта информация о маршруте говорит, какой пользователь / блок элементов получает входные данные от какого элемента / пользовательского блока соответственно. После этого начинается итерация ALS.
Поскольку базовый механизм выполнения Flink представляет собой механизм потока данных с параллельным потоком, он пытается выполнить как можно больше частей потока данных конвейерным способом. Это требует одновременной работы всех операторов конвейера. Преимущество этого состоит в том, что Флинк избегает получения промежуточных результатов, которые могут быть чрезмерно большими. Недостатком является то, что доступная память должна быть распределена между всеми работающими операторами. В случае БАС, где размер человека DataSet
элементы (например, блоки user / item) довольно большие, это нежелательно.
Чтобы решить эту проблему, не все операторы реализации выполняются одновременно, если вы установили temporaryPath
, Путь определяет, где промежуточные результаты могут быть сохранены. Таким образом, если вы определили временный путь, то ALS
сначала вычисляет информацию о маршрутизации для пользовательских блоков и записывает их на диск, затем вычисляет информацию о маршрутизации для блоков элементов и записывает их на диск и, наконец, запускает итерацию ALS, для которой она считывает информацию о маршрутизации из временного пути,
Вычисление информации о маршруте для пользовательских блоков и блоков элементов зависит от заданного набора данных оценок. В вашем случае, когда вы вычисляете информацию о маршруте пользователя, он сначала прочитает набор данных рейтингов и применит first
оператор на это. first
оператор возвращает n
произвольные элементы из базового набора данных. Проблема сейчас в том, что Flink не хранит результат этого first
операция для расчета информации о маршруте товара. Вместо этого, когда вы начнете вычислять информацию о маршруте предметов, Flink будет повторно выполнять поток данных, начиная с его источников. Это означает, что он считывает данные рейтинга с диска и применяет first
оператор на это снова. Это даст вам во многих случаях другой набор рейтингов по сравнению с результатом первого first
операция. Следовательно, сгенерированная информация о маршрутизации противоречива и ALS
выходит из строя.
Вы можете обойти проблему, материализовав результат first
оператор и использовать этот результат в качестве входа для ALS
алгоритм. Предмет FlinkMLTools
содержит метод persist
который занимает DataSet
, записывает его по указанному пути, а затем возвращает новый DataSet
который читает только что написанное DataSet
, Это позволяет разбить получившийся график потока данных.
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
Кроме того, вы можете попытаться покинуть temporaryPath
снята с охраны. Затем все этапы (расчет информации о маршруте и итерация) выполняются конвейерным способом. Это означает, что для расчета информации о маршрутизации пользователя и элемента используется один и тот же набор входных данных, который получается из first
оператор.
В настоящее время сообщество Flink работает над сохранением промежуточных результатов операторов в памяти. Это позволит закрепить результат first
оператор, так что он не будет рассчитан дважды и, таким образом, не дает разных результатов из-за его недетерминированной природы.