Преобразовать Java-Pair-Rdd в Rdd

Мне нужно преобразовать мою Java-pair-rdd в CSV:

поэтому я думаю преобразовать его в rdd, чтобы решить мою проблему.

я хочу, чтобы мой rdd был преобразован из:

Key   Value
Jack  [a,b,c]

к:

Key  value
Jack  a
Jack  b
Jack  c

я вижу, что это возможно в этой проблеме и в этой проблеме ( PySpark: преобразовать пару RDD обратно в обычную RDD), поэтому я спрашиваю, как это сделать в Java?

Обновление вопроса

Тип моего JavaPairRdd имеет тип:

JavaPairRDD<Tuple2<String,String>, Iterable<Tuple1<String>>>

и это форма строки, которая содержит:

((dr5rvey,dr5ruku),[(2,01/09/2013 00:09,01/09/2013 00:27,N,1,-73.9287262,40.75831223,-73.98726654,40.76442719,2,3.96,16,0.5,0.5,4.25,0,,21.25,1,)])

ключ здесь: (dr5rvey,dr5ruku) и значение [(2,01/09/2013 00:09,01/09/2013 00:27,N,1,-73.9287262,40.75831223,-73.98726654,40.76442719,2,3.96,16,0.5,0.5,4.25,0,,21.25,1,)]

мой оригинальный JavaRdd был типа:

JavaRDD<String>

3 ответа

Понимая, что ключи должны быть сохранены, вы можете использовать функцию flatMapValues:

Передайте каждое значение в паре ключ-значение RDD через функцию flatMap без изменения ключей; ...

JavaPairRDD<Tuple2<String,String>, Iterable<Tuple1<String>>> input = ...;
JavaPairRDD<Tuple2<String, String>, Tuple1<String>> output1 = input.flatMapValues(iter -> iter);
JavaPairRDD<Tuple2<String, String>, String> output2 = output1.mapValues(t1 -> t1._1());

Если я правильно понимаю, вам нужно использовать функцию плоской карты, она позволяет вам создавать несколько строк из одного ключа, например, в Scala(просто идея, которую вам нужно изменить для вашего случая использования):

rdd.flatMap(arg0 => {
        var list = List[Row]()
        list = arg0._2.split(",")
        list
    })

Это супер упрощенный пример, но вы должны понять суть.

для rdd:

key      val
mykey   "a,b,c'

Возвращенный RDD будет:

key      val
mykey   "a"
mykey   "b"
mykey   "c"

Тип вашего RDD является RDD[(String, Array[String])] если я правильно понял Таким образом, вы можете просто применить flatMap к этому RDD.

val rdd: RDD[(String, Array[String])] = ???
val newRDD = rdd.flatMap{case (key, array) => array.map(value => (key, value))}

newRDD будет иметь тип RDD[(String, String)]

Другие вопросы по тегам