Механизм рекомендаций, использующий Apache Spark MLIB, отображающий нулевые рекомендации после обработки всех операций

Я новичок, когда дело доходит до реализации алгоритмов ML. Я хотел реализовать рекомендацию Engine и после небольшого эксперимента узнал, что совместная фильтрация может использоваться для одного и того же. Я использую Apache Spark для того же. Я получил помощь от одного из блогов и попытался реализовать то же самое на моем местном. Код PFB, который я опробовал. Каждый раз, когда я выполняю это, количество рекомендаций, которое печатается, всегда равно нулю. Я не вижу очевидных ошибок как таковых. Может кто-нибудь, пожалуйста, помогите мне понять это. Также, пожалуйста, не стесняйтесь предоставить любую другую ссылку, которая может быть передана в этом отношении.

package mllib.example;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

import scala.Tuple2;


public class RecommendationEngine {

    public static void main(String[] args) {


        // Create Java spark context
        SparkConf conf = new SparkConf().setAppName("Recommendation System Example").setMaster("local[2]").set("spark.executor.memory","1g");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Read user-item rating file. format - userId,itemId,rating
        JavaRDD<String> userItemRatingsFile = sc.textFile(args[0]);

        System.out.println("Count is "+userItemRatingsFile.count());

        // Read item description file. format - itemId, itemName, Other Fields,..
        JavaRDD<String> itemDescritpionFile = sc.textFile(args[1]);

        System.out.println("itemDescritpionFile Count is "+itemDescritpionFile.count());

        // Map file to Ratings(user,item,rating) tuples
        JavaRDD<Rating> ratings = userItemRatingsFile.map(new Function<String, Rating>() {
            public Rating call(String s) {
                String[] sarray = s.split(",");
                return new Rating(Integer.parseInt(sarray[0]), Integer
                        .parseInt(sarray[1]), Double.parseDouble(sarray[2]));
            }
        });

        System.out.println("Ratings RDD Object"+ratings.first().toString());

        // Create tuples(itemId,ItemDescription), will be used later to get names of item from itemId
        JavaPairRDD<Integer,String> itemDescritpion = itemDescritpionFile.mapToPair(
                new PairFunction<String, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(String t) throws Exception {
                String[] s = t.split(",");
                return new Tuple2<Integer,String>(Integer.parseInt(s[0]), s[1]);
            }
        });

        System.out.println("itemDescritpion RDD Object"+ratings.first().toString());

        // Build the recommendation model using ALS

        int rank = 10; // 10 latent factors
        int numIterations = Integer.parseInt(args[2]); // number of iterations

        MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings),
                rank, numIterations);
        //ALS.trainImplicit(arg0, arg1, arg2)

        // Create user-item tuples from ratings
        JavaRDD<Tuple2<Object, Object>> userProducts = ratings
                .map(new Function<Rating, Tuple2<Object, Object>>() {
                    public Tuple2<Object, Object> call(Rating r) {
                        return new Tuple2<Object, Object>(r.user(), r.product());
                    }
                });

        // Calculate the itemIds not rated by a particular user, say user with userId = 1
        JavaRDD<Integer> notRatedByUser = userProducts.filter(new Function<Tuple2<Object,Object>, Boolean>() {
            @Override
            public Boolean call(Tuple2<Object, Object> v1) throws Exception {
                if (((Integer) v1._1).intValue() != 0) {
                    return true;
                }
                return false;
            }
        }).map(new Function<Tuple2<Object,Object>, Integer>() {
            @Override
            public Integer call(Tuple2<Object, Object> v1) throws Exception {
                return (Integer) v1._2;
            }
        });

        // Create user-item tuples for the items that are not rated by user, with user id 1
        JavaRDD<Tuple2<Object, Object>> itemsNotRatedByUser = notRatedByUser
                .map(new Function<Integer, Tuple2<Object, Object>>() {
                    public Tuple2<Object, Object> call(Integer r) {
                        return new Tuple2<Object, Object>(0, r);
                    }
        });

        // Predict the ratings of the items not rated by user for the user
        JavaRDD<Rating> recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();

        // Sort the recommendations by rating in descending order
        recomondations = recomondations.sortBy(new Function<Rating,Double>(){
            @Override
            public Double call(Rating v1) throws Exception {
                return v1.rating();
            }

        }, false, 1);   

        System.out.println("recomondations Total is "+recomondations.count());

        // Get top 10 recommendations
        JavaRDD<Rating> topRecomondations = sc.parallelize(recomondations.take(10));

        // Join top 10 recommendations with item descriptions
        JavaRDD<Tuple2<Rating, String>> recommendedItems = topRecomondations.mapToPair(
                new PairFunction<Rating, Integer, Rating>() {
            @Override
            public Tuple2<Integer, Rating> call(Rating t) throws Exception {
                return new Tuple2<Integer,Rating>(t.product(),t);
            }
        }).join(itemDescritpion).values();

        System.out.println("recommendedItems count is "+recommendedItems.count());

        //Print the top recommendations for user 1.
        recommendedItems.foreach(new VoidFunction<Tuple2<Rating,String>>() {
            @Override
            public void call(Tuple2<Rating, String> t) throws Exception {
                System.out.println(t._1.product() + "\t" + t._1.rating() + "\t" + t._2);
            }
        });

Кроме того, я вижу, что эта работа выполняется очень долго. Каждый раз, когда он создает модель. Есть способ, которым я могу создать модель один раз, сохранить ее и загрузить ее для последовательных прогнозов. Можем ли мы случайно повысить скорость выполнения этой работы?

Заранее спасибо

0 ответов