Преобразовать org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row
У меня есть список org.apache.avro.generic.GenericRecord
, avro schema
используя это нам нужно создать dataframe
с помощью SQLContext
API, для создания dataframe
это нужно RDD
из org.apache.spark.sql.Row
а также avro schema
, Необходимым условием для создания DF является наличие RDD org.apache.spark.sql.Row, и это может быть достигнуто с помощью приведенного ниже кода, но кое-как, как это не работает и выдает ошибку, пример кода.
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
Но это создает ошибку при создании DataFrame
, Может кто-нибудь, пожалуйста, помогите мне, что не так в приведенном выше коде. Помимо этого, если кто-то имеет другую логику для преобразования и создания dataframe
,
Всякий раз, когда я буду вызывать какие-либо действия в Dataframe, он будет выполнять DAG и пытаться создать объект DF, но в этом он терпит неудачу с нижеуказанным исключением, как
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
После этого я пытаюсь дать правильную версию jar в параметре jar для отправки submit и с другим параметром, например --conf spark.driver.userClassPathFirst=true, но теперь происходит сбой с MapR как
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
Мы используем дистрибутив MapR и после изменения пути к классу в spark-submit происходит сбой с вышеуказанным исключением.
Может кто-нибудь, пожалуйста, помогите здесь или моя основная потребность в том, чтобы конвертировать Avro GenericRecord в Spark Row, чтобы я мог создать Dataframe с ним, пожалуйста, помогите
Благодарю.
2 ответа
При создании фрейма данных из RDD[GenericRecord] есть несколько шагов
- Сначала необходимо преобразовать org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row.
Используйте com.databricks.spark.avro.SchemaConverters.createConverterToSQL( sourceAvroSchema: Schema,targetSqlType: DataType)
это частный метод в версии spark-avro 3.2. Если у нас то же самое или меньше 3.2, скопируйте этот метод в свой собственный класс утилит и используйте его, иначе используйте его напрямую.
- Создать Dataframe из коллекции Row (rowSeq).
val rdd = ssc.sparkContext.parallelize (rowSeq, numParition) val dataframe = sparkSession.createDataFrame (rowRDD, schemaType)
Это решает мою проблему.
Может быть, это поможет кому-то позже прийти в игру.
поскольку spark-avro
устарела и теперь интегрирована в Spark, это можно сделать другим способом.
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
...
val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()
val rows = data.map { record =>
enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}
val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)
Надеюсь, это поможет. В первой части вы можете найти, как конвертировать из GenericRecord в Row
Как преобразовать RDD[GenericRecord] в фрейм данных в Scala?