Исключение исключения при использовании filter() в map()
Я пытаюсь использовать filter() внутри map(), но я получаю это исключение искры:
Преобразования и действия СДР могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063.
я знаю, что spark не допускает вложенные преобразования / действия /RDD, поэтому любой может дать мне совет, как сделать это альтернативно (без вложенных преобразований или действий), ну, у меня есть RDD, его кортежи похожи на:
JavaRDD< String[]> RDD
я пытаюсь сопоставить его, давая ему список в качестве аргумента, этот список содержит javaPairRDDs такие:
List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));
эти строки ссылаются на функцию modifyRDD ():
public static class modifyRDD implements Function <String[], String[]> {
List<JavaPairRDD<String,String>> list;
public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
int j=i;
// select the appropriate RDD from the RDDs_list to the current index
JavaPairRDD<String,String> rdd_i = list.get(i);
String previousElement=s[j];
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
String newElement=currentRDD.first()._2();
s[j]=newElement;
}
return (s) ;
}
}
Итак, проблема в этой линии
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
Теперь я приведу пример, предположим, что список содержит 2 PairRDDs
list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}
и мой RDD, который я хочу отобразить, содержит:
JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}
результат, который я хочу получить после map():
JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}
1 ответ
Я изменил тип списка с List> на List>> list, чтобы избежать работы с RDD внутри map(), теперь у меня нет исключений (конечно, у меня нет вложенных преобразований), но я не уверен в этом если мой новый код эффективен, bcz Список> велик, и для поиска элемента я использовал цикл "для" (означает, что мне нужно развернуть весь Список>, чтобы получить нужный элемент), поэтому я прошу вас как экспертов дать мне замечания об этом (используя цикл для), и дать предложение, чтобы улучшить его. благодарю вас
это функция map () после модификации
public static class modifyRDD implements Function <String[], String[]> {
List<List<Tuple2<String,String>>> list;
public modifyRDD (List<List<Tuple2<String,String>>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
// select the appropriate lookup_list
List<Tuple2<String,String>> list_i = list.get(i);
String previousElement=s[i];
String newElement="";
for (int k = 0; k < list_i.size(); k++){
Tuple2<String,String> sk1 = list_i.get(k);
if (sk1._1.equals( previousElement)){ newElement=sk1._2;}
}
s[i]= newElement;
}
return(s);
}