OutOfBoundsException с ALS - Flink MLlib

Я делаю систему рекомендаций для фильмов, используя наборы данных MovieLens, доступные здесь: http://grouplens.org/datasets/movielens/

Чтобы вычислить эту систему рекомендаций, я использую библиотеку ML Flink в Scala и, в частности, алгоритм ALS (org.apache.flink.ml.recommendation.ALS).

Я сначала сопоставляю рейтинги фильма в DataSet[(Int, Int, Double)] а затем создать trainingSet и testSet (см. код ниже).

Моя проблема в том, что при использовании ALS.fit работать со всем набором данных (со всеми оценками), но если я удаляю только одну оценку, функция подгонки больше не работает, и я не понимаю, почему.

У Вас есть какие-либо идеи?:)

Используемый код:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

"Но если я просто уберу только один рейтинг"

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

Ошибка:

19.06.2015 15:00:24 CoGroup (CoGroup на org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) переключен на FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

в org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

в org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)

в org.apache.flink.runtime.operators.CoGroupDriver.run (CoGroupDriver.java:152)

...

1 ответ

Решение

Проблема заключается в first оператор в сочетании с setTemporaryPath параметр Флинка ALS реализация. Чтобы понять проблему, позвольте мне быстро объяснить, как работает алгоритм блокировки ALS.

Реализация блокировки чередующихся наименьших квадратов сначала разбивает заданную матрицу оценок по пользователям и по элементам на блоки. Для этих блоков рассчитывается информация о маршруте. Эта информация о маршруте говорит, какой пользователь / блок элементов получает входные данные от какого элемента / пользовательского блока соответственно. После этого начинается итерация ALS.

Поскольку базовый механизм выполнения Flink представляет собой механизм потока данных с параллельным потоком, он пытается выполнить как можно больше частей потока данных конвейерным способом. Это требует одновременной работы всех операторов конвейера. Преимущество этого состоит в том, что Флинк избегает получения промежуточных результатов, которые могут быть чрезмерно большими. Недостатком является то, что доступная память должна быть распределена между всеми работающими операторами. В случае БАС, где размер человека DataSet элементы (например, блоки user / item) довольно большие, это нежелательно.

Чтобы решить эту проблему, не все операторы реализации выполняются одновременно, если вы установили temporaryPath, Путь определяет, где промежуточные результаты могут быть сохранены. Таким образом, если вы определили временный путь, то ALS сначала вычисляет информацию о маршрутизации для пользовательских блоков и записывает их на диск, затем вычисляет информацию о маршрутизации для блоков элементов и записывает их на диск и, наконец, запускает итерацию ALS, для которой она считывает информацию о маршрутизации из временного пути,

Вычисление информации о маршруте для пользовательских блоков и блоков элементов зависит от заданного набора данных оценок. В вашем случае, когда вы вычисляете информацию о маршруте пользователя, он сначала прочитает набор данных рейтингов и применит first оператор на это. first оператор возвращает nпроизвольные элементы из базового набора данных. Проблема сейчас в том, что Flink не хранит результат этого first операция для расчета информации о маршруте товара. Вместо этого, когда вы начнете вычислять информацию о маршруте предметов, Flink будет повторно выполнять поток данных, начиная с его источников. Это означает, что он считывает данные рейтинга с диска и применяет first оператор на это снова. Это даст вам во многих случаях другой набор рейтингов по сравнению с результатом первого first операция. Следовательно, сгенерированная информация о маршрутизации противоречива и ALS выходит из строя.

Вы можете обойти проблему, материализовав результат first оператор и использовать этот результат в качестве входа для ALS алгоритм. Предмет FlinkMLTools содержит метод persist который занимает DataSet, записывает его по указанному пути, а затем возвращает новый DataSet который читает только что написанное DataSet, Это позволяет разбить получившийся график потока данных.

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

Кроме того, вы можете попытаться покинуть temporaryPath снята с охраны. Затем все этапы (расчет информации о маршруте и итерация) выполняются конвейерным способом. Это означает, что для расчета информации о маршрутизации пользователя и элемента используется один и тот же набор входных данных, который получается из first оператор.

В настоящее время сообщество Flink работает над сохранением промежуточных результатов операторов в памяти. Это позволит закрепить результат first оператор, так что он не будет рассчитан дважды и, таким образом, не дает разных результатов из-за его недетерминированной природы.

Другие вопросы по тегам