Преобразование JavaPairRDD в Dataframe в Spark Java API

Я использую Spark 1.6 с Java 7

У меня есть пара RDD:

JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);

Я хочу преобразовать это в DataFrame со схемой.

Похоже, что сначала я должен конвертировать pair RDD в RowRDD.

Так как же создать RowRdd из PairRDD?

2 ответа

Решение

Для Java 7 вам нужно определить функцию карты

public static final Function<Tuple2<String, String>,Row> mappingFunc = (tuple) -> {
    return RowFactory.create(tuple._1(),tuple._2());
};

Теперь вы можете вызвать эту функцию, чтобы получить JavaRDD<Row>

JavaRDD<Row> rowRDD = filesRDD.map(mappingFunc);

С Java 8 это просто как

JavaRDD<Row> rowRDD = filesRDD.map(tuple -> RowFactory.create(tuple._1(),tuple._2()));

Другой способ получить Dataframe из JavaPairRDD

DataFrame df = sqlContext.createDataset(JavaPairRDD.toRDD(filesRDD), Encoders.tuple(Encoders.STRING(),Encoders.STRING())).toDF();

Ниже приведен один из способов достижения этого.

    //Read whole files
    JavaPairRDD<String, String> pairRDD = sparkContext.wholeTextFiles(path);

    //create a structType for creating the dataframe later. You might want to
    //do this in a different way if your schema is big/complicated. For the sake of this
    //example I took a simple one.
    StructType structType = DataTypes
            .createStructType(
                    new StructField[]{
                            DataTypes.createStructField("id", DataTypes.StringType, true)
                            , DataTypes.createStructField("name", DataTypes.StringType, true)});


    //create an RDD<Row> from pairRDD
    JavaRDD<Row> rowJavaRDD = pairRDD.values().flatMap(new FlatMapFunction<String, Row>() {
        public Iterable<Row> call(String s) throws Exception {
            List<Row> rows = new ArrayList<Row>();
            for (String line : s.split("\n")) {
                String[] values = line.split(",");
                Row row = RowFactory.create(values[0], values[1]);
                rows.add(row);
            }
            return rows;
        }
    });


    //Create Dataframe.
    sqlContext.createDataFrame(rowJavaRDD, structType);

Образец данных, которые я использовал
File1:

1, john  
2, steve

File2:

3, Mike  
4, Mary  

вывод из df.show():

+---+------+
| id|  name|
+---+------+
|  1|  john|
|  2| steve|
|  3|  Mike|
|  4|  Mary|
+---+------+
Другие вопросы по тегам