Преобразовать Java-Pair-Rdd в Rdd
Мне нужно преобразовать мою Java-pair-rdd в CSV:
поэтому я думаю преобразовать его в rdd, чтобы решить мою проблему.
я хочу, чтобы мой rdd был преобразован из:
Key Value
Jack [a,b,c]
к:
Key value
Jack a
Jack b
Jack c
я вижу, что это возможно в этой проблеме и в этой проблеме ( PySpark: преобразовать пару RDD обратно в обычную RDD), поэтому я спрашиваю, как это сделать в Java?
Обновление вопроса
Тип моего JavaPairRdd имеет тип:
JavaPairRDD<Tuple2<String,String>, Iterable<Tuple1<String>>>
и это форма строки, которая содержит:
((dr5rvey,dr5ruku),[(2,01/09/2013 00:09,01/09/2013 00:27,N,1,-73.9287262,40.75831223,-73.98726654,40.76442719,2,3.96,16,0.5,0.5,4.25,0,,21.25,1,)])
ключ здесь: (dr5rvey,dr5ruku)
и значение [(2,01/09/2013 00:09,01/09/2013 00:27,N,1,-73.9287262,40.75831223,-73.98726654,40.76442719,2,3.96,16,0.5,0.5,4.25,0,,21.25,1,)]
мой оригинальный JavaRdd был типа:
JavaRDD<String>
3 ответа
Понимая, что ключи должны быть сохранены, вы можете использовать функцию flatMapValues:
Передайте каждое значение в паре ключ-значение RDD через функцию flatMap без изменения ключей; ...
JavaPairRDD<Tuple2<String,String>, Iterable<Tuple1<String>>> input = ...;
JavaPairRDD<Tuple2<String, String>, Tuple1<String>> output1 = input.flatMapValues(iter -> iter);
JavaPairRDD<Tuple2<String, String>, String> output2 = output1.mapValues(t1 -> t1._1());
Если я правильно понимаю, вам нужно использовать функцию плоской карты, она позволяет вам создавать несколько строк из одного ключа, например, в Scala(просто идея, которую вам нужно изменить для вашего случая использования):
rdd.flatMap(arg0 => {
var list = List[Row]()
list = arg0._2.split(",")
list
})
Это супер упрощенный пример, но вы должны понять суть.
для rdd:
key val
mykey "a,b,c'
Возвращенный RDD будет:
key val
mykey "a"
mykey "b"
mykey "c"
Тип вашего RDD
является RDD[(String, Array[String])]
если я правильно понял Таким образом, вы можете просто применить flatMap к этому RDD.
val rdd: RDD[(String, Array[String])] = ???
val newRDD = rdd.flatMap{case (key, array) => array.map(value => (key, value))}
newRDD
будет иметь тип RDD[(String, String)]