Фильтрация СДР по количеству вхождений

У меня есть СДР рейтингов продуктов с использованием объекта MLlib Rating, который является просто кортежем (int userId, int productId, double rating). Я хочу удалить любой элемент из СДР, являющийся обзором продукта со слишком низким рейтингом.

Например, СДР может быть таким:

Rating(35, 1, 5.0)
Rating(18, 1, 4.0)
Rating(29, 2, 3.0)
Rating(12, 2, 2.0)
Rating(65, 3, 1.0)

и если бы я отфильтровал это, чтобы удалить любой продукт с менее чем 2 отзывами, он просто отфильтровал бы последний рейтинг и вернул бы первые четыре. (Я хочу фильтровать с более высоким минимальным количеством просмотров, чем 2, но только для примера).

В настоящее время у меня есть этот код, который выводит последовательность идентификаторов продуктов в порядке количества оценок, но я не был уверен в способе фильтрации из основного RDD на основе этого, и он все равно кажется неэффективным:

val mostRated = ratings.map(_._2.product)
                       .countByValue
                       .toSeq
                       .sortBy(- _._2)
                       .map(_._1)

1 ответ

Вы можете сгруппировать rdd по ProductId и затем отфильтровать его, основываясь на том, превышает ли длина группы пороговое значение (1 здесь). Используйте flatMap для извлечения результатов из сгруппированного rdd:

case class Rating(UserId: Int, ProductId: Int, Rating: Double)

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0),
    Rating(18, 1, 4.0),
    Rating(29, 2, 3.0),
    Rating(12, 2, 2.0),
    Rating(65, 3, 1.0)))

val prodMinCounts = ratings.groupBy(_.ProductId).
                            filter(_._2.toSeq.length > 1).
                            flatMap(_._2)
prodMinCounts.collect
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))
Другие вопросы по тегам