Исключение исключения при использовании filter() в map()

Я пытаюсь использовать filter() внутри map(), но я получаю это исключение искры:

Преобразования и действия СДР могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063.

я знаю, что spark не допускает вложенные преобразования / действия /RDD, поэтому любой может дать мне совет, как сделать это альтернативно (без вложенных преобразований или действий), ну, у меня есть RDD, его кортежи похожи на:

 JavaRDD< String[]> RDD

я пытаюсь сопоставить его, давая ему список в качестве аргумента, этот список содержит javaPairRDDs такие:

List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));

эти строки ссылаются на функцию modifyRDD ():

public static class modifyRDD implements Function <String[], String[]> { 

    List<JavaPairRDD<String,String>> list;
    public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         
            int j=i; 
         // select the appropriate RDD from the RDDs_list to the current index 

            JavaPairRDD<String,String> rdd_i = list.get(i);
            String previousElement=s[j];

           JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

           String newElement=currentRDD.first()._2();   

           s[j]=newElement;
                }

          return   (s) ;

    }


    }

Итак, проблема в этой линии

  JavaPairRDD<String,String> currentRDD =  rdd_i.filter(line -> line._1().equals(previousElement));

Теперь я приведу пример, предположим, что список содержит 2 PairRDDs

list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}

и мой RDD, который я хочу отобразить, содержит:

 JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}

результат, который я хочу получить после map():

 JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}

1 ответ

Я изменил тип списка с List> на List>> list, чтобы избежать работы с RDD внутри map(), теперь у меня нет исключений (конечно, у меня нет вложенных преобразований), но я не уверен в этом если мой новый код эффективен, bcz Список> велик, и для поиска элемента я использовал цикл "для" (означает, что мне нужно развернуть весь Список>, чтобы получить нужный элемент), поэтому я прошу вас как экспертов дать мне замечания об этом (используя цикл для), и дать предложение, чтобы улучшить его. благодарю вас

это функция map () после модификации

  public static class modifyRDD implements Function <String[], String[]> { 

    List<List<Tuple2<String,String>>> list;
    public modifyRDD (List<List<Tuple2<String,String>>> list ){ this.list=list;}

    public String [] call(String[] t) {

          String[] s = t;

          for (int i = 0; i < NB_TD; i++) {         

         // select the appropriate lookup_list 

            List<Tuple2<String,String>> list_i = list.get(i);
            String previousElement=s[i];
            String newElement="";

            for (int k = 0; k < list_i.size(); k++){

            Tuple2<String,String> sk1 = list_i.get(k);
            if (sk1._1.equals( previousElement)){  newElement=sk1._2;}

            }


           s[i]= newElement;
                }
         return(s);
                                   }
Другие вопросы по тегам