Как объединить два набора данных искры в один с Java-объектами?

У меня есть небольшая проблема объединения двух наборов данных в искре, у меня есть это:

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);

Я могу распечатать схему и показать ее правильно.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));

Последняя строка не может присоединиться и получить мне эту ошибку:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;

Это правда, потому что Tuple2 (object2) не имеет всех переменных...

Тогда я попробовал это:

 Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
    .joinWith(object2DS, object1DS
        .col("column01")
        .equalTo(object2DS.col("column01")));

И работает отлично! Но мне нужен новый набор данных без кортежа, у меня есть object3, у которого есть некоторые переменные из object1 и object2, тогда у меня есть эта проблема:

Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
    MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
    MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
    MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
    //...
    //Sets data from object 1 and 2 to 3.
    //...
    return myOwnObject3;
}, encoderObject3);

Сбой!... вот ошибка:

17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import

и более тысячи строк ошибок...

Что я могу сделать? Я пытался:

  • Сделать мой объект только с String, int (или Integer) и double (или Double) (не более)
  • используйте разные кодировщики, такие как kryo или javaSerialization
  • использовать JavaRDD (работает! но очень медленно) и использовать Dataframes со строками (работает, но мне нужно изменить много объектов)
  • Все мои объекты Java являются сериализуемыми
  • использовать искры 2.1.0 и 2.1.1, теперь у меня есть 2.1.1 на моем pom.xml

Я хочу использовать наборы данных, чтобы использовать скорость из Dataframes и объектный синтаксис из JavaRDD...

Помогите?

Спасибо

1 ответ

Решение

Наконец я нашел решение,

У меня была проблема с опцией inferSchema, когда мой код создавал набор данных. У меня есть столбец String, в котором опция inferSchema возвращает мне столбец Integer, потому что все значения являются "числовыми", но мне нужно использовать их как String (например, "0001", "0002"...). Мне нужно сделать схему, но У меня есть много переменных, затем я пишу это со всеми моими классами:

List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
    fieldsObject1.add(DataTypes.createStructField(
        field.getName(),
        CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
        true)
    );
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(schemaObject1)
    .csv(pathToFile1)
    .as(encoderObject1);

Работает отлично.

"Лучшее" решение будет следующим:

  Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(encoderObject1.schema())
    .csv(pathToFile1)
    .as(encoderObject1);

но encoderObject1.schema() возвращает мне схему с переменными в алфавитном порядке, а не в исходном порядке, тогда эта опция не работает, когда я читаю CSV. Может быть, кодировщики должны вернуть схему с переменными в исходном порядке, а не в алфавитном порядке

Другие вопросы по тегам