Фильтрация СДР по количеству вхождений
У меня есть СДР рейтингов продуктов с использованием объекта MLlib Rating, который является просто кортежем (int userId, int productId, double rating). Я хочу удалить любой элемент из СДР, являющийся обзором продукта со слишком низким рейтингом.
Например, СДР может быть таким:
Rating(35, 1, 5.0)
Rating(18, 1, 4.0)
Rating(29, 2, 3.0)
Rating(12, 2, 2.0)
Rating(65, 3, 1.0)
и если бы я отфильтровал это, чтобы удалить любой продукт с менее чем 2 отзывами, он просто отфильтровал бы последний рейтинг и вернул бы первые четыре. (Я хочу фильтровать с более высоким минимальным количеством просмотров, чем 2, но только для примера).
В настоящее время у меня есть этот код, который выводит последовательность идентификаторов продуктов в порядке количества оценок, но я не был уверен в способе фильтрации из основного RDD на основе этого, и он все равно кажется неэффективным:
val mostRated = ratings.map(_._2.product)
.countByValue
.toSeq
.sortBy(- _._2)
.map(_._1)
1 ответ
Вы можете сгруппировать rdd по ProductId и затем отфильтровать его, основываясь на том, превышает ли длина группы пороговое значение (1 здесь). Используйте flatMap для извлечения результатов из сгруппированного rdd:
case class Rating(UserId: Int, ProductId: Int, Rating: Double)
val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0),
Rating(18, 1, 4.0),
Rating(29, 2, 3.0),
Rating(12, 2, 2.0),
Rating(65, 3, 1.0)))
val prodMinCounts = ratings.groupBy(_.ProductId).
filter(_._2.toSeq.length > 1).
flatMap(_._2)
prodMinCounts.collect
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))