Преобразовать org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row

У меня есть список org.apache.avro.generic.GenericRecord, avro schemaиспользуя это нам нужно создать dataframe с помощью SQLContext API, для создания dataframe это нужно RDD из org.apache.spark.sql.Row а также avro schema, Необходимым условием для создания DF является наличие RDD org.apache.spark.sql.Row, и это может быть достигнуто с помощью приведенного ниже кода, но кое-как, как это не работает и выдает ошибку, пример кода.

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

Но это создает ошибку при создании DataFrame, Может кто-нибудь, пожалуйста, помогите мне, что не так в приведенном выше коде. Помимо этого, если кто-то имеет другую логику для преобразования и создания dataframe,

Всякий раз, когда я буду вызывать какие-либо действия в Dataframe, он будет выполнять DAG и пытаться создать объект DF, но в этом он терпит неудачу с нижеуказанным исключением, как

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

После этого я пытаюсь дать правильную версию jar в параметре jar для отправки submit и с другим параметром, например --conf spark.driver.userClassPathFirst=true, но теперь происходит сбой с MapR как

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

Мы используем дистрибутив MapR и после изменения пути к классу в spark-submit происходит сбой с вышеуказанным исключением.

Может кто-нибудь, пожалуйста, помогите здесь или моя основная потребность в том, чтобы конвертировать Avro GenericRecord в Spark Row, чтобы я мог создать Dataframe с ним, пожалуйста, помогите
Благодарю.

2 ответа

Решение

При создании фрейма данных из RDD[GenericRecord] есть несколько шагов

  1. Сначала необходимо преобразовать org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row.

Используйте com.databricks.spark.avro.SchemaConverters.createConverterToSQL( sourceAvroSchema: Schema,targetSqlType: DataType)

это частный метод в версии spark-avro 3.2. Если у нас то же самое или меньше 3.2, скопируйте этот метод в свой собственный класс утилит и используйте его, иначе используйте его напрямую.

  1. Создать Dataframe из коллекции Row (rowSeq).

val rdd = ssc.sparkContext.parallelize (rowSeq, numParition) val dataframe = sparkSession.createDataFrame (rowRDD, schemaType)

Это решает мою проблему.

Может быть, это поможет кому-то позже прийти в игру.

поскольку spark-avro устарела и теперь интегрирована в Spark, это можно сделать другим способом.

import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()

val rows = data.map { record =>
    enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}

val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)

Надеюсь, это поможет. В первой части вы можете найти, как конвертировать из GenericRecord в Row

Как преобразовать RDD[GenericRecord] в фрейм данных в Scala?

Другие вопросы по тегам