Как преобразовать RDD[GenericRecord] в фрейм данных в Scala?
Я получаю твиты из темы кафки с Avro (сериализатор и десериализатор). Затем я создаю искровой потребитель, который извлекает твиты в Dstream of RDD[GenericRecord]. Теперь я хочу преобразовать каждый rdd в фрейм данных для анализа этих твитов с помощью SQL. Любое решение для преобразования RDD [GenericRecord] в dataframe, пожалуйста?
4 ответа
Я потратил некоторое время, пытаясь заставить эту работу (особенно, как правильно десериализовать данные, но, похоже, вы уже рассмотрели это)... ОБНОВЛЕНО
//Define function to convert from GenericRecord to Row
def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
import scala.collection.JavaConversions._
for (field <- record.getSchema.getFields) {
objectArray(field.pos) = record.get(field.pos)
}
new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
}
//Inside your stream foreachRDD
val yourGenericRecordRDD = ...
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))
var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])
Как вы видите, я использую SchemaConverter, чтобы получить структуру dataframe из схемы, которую вы использовали для десериализации (это может быть более болезненным для реестра схемы). Для этого вам нужна следующая зависимость
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>
вам нужно будет изменить свою версию искры в зависимости от вашей.
ОБНОВЛЕНИЕ: приведенный выше код работает только для плоских авро-схем.
Для вложенных структур я использовал что-то другое. Вы можете скопировать класс SchemaConverters, он должен быть внутри com.databricks.spark.avro
(он использует некоторые защищенные классы из пакета databricks) или вы можете попробовать использовать зависимость spark-bigquery. Класс не будет доступен по умолчанию, поэтому вам нужно будет создать класс внутри пакета com.databricks.spark.avro
чтобы получить доступ к заводскому методу.
package com.databricks.spark.avro
import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
class SchemaConverterUtils {
def converterSql(schema : Schema, sqlType : StructType) = {
createConverterToSQL(schema, sqlType)
}
}
После этого вы сможете преобразовать данные как
val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
///
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
...
val rowRdd = genericRecordRDD.flatMap(record => {
Try(converter(record).asInstanceOf[Row]).toOption
})
//To DataFrame
val df = sqlContext.createDataFrame(rowRdd, sqlType)
Комбинация /questions/43417145/kak-konvertirovat-vlozhennyij-avro-genericrecord-v-stroku/43417152#43417152 и /questions/28588914/kak-preobrazovat-rddgenericrecord-v-frejm-dannyih-v-scala/28588924#28588924 работает для меня.
Я использовал следующее для создания MySchemaConversions
package com.databricks.spark.avro
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType
object MySchemaConversions {
def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}
А потом я использовал
val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)
// unionedResultRdd is unionRDD[GenericRecord]
var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])
Преимущество наличия myConverter в объекте MyObject заключается в том, что вы не столкнетесь с проблемами сериализации (java.io.NotSerializableException).
object MyObject{
def myConverter(record: GenericRecord,
myAvroRecordConverter: (GenericRecord) => Row): Row =
myAvroRecordConverter.apply(record)
}
Хотя что-то вроде этого может помочь вам,
val stream = ...
val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
val df = rdd.map(_.toSeq)
.map(seq=> Row.fromSeq(seq))
.toDF(col1,col2, ....)
df
})
Я хотел бы предложить вам альтернативный подход. С Spark 2.x вы можете пропустить весь процесс создания DStreams
, Вместо этого вы можете сделать что-то подобное со структурированной потоковой передачей,
val df = ss.readStream
.format("com.databricks.spark.avro")
.load("/path/to/files")
Это даст вам один фрейм данных, который вы можете запросить напрямую. Вот, ss
является примером искровой сессии. /path/to/files
это место, где все ваши avro файлы выгружаются из кафки.
PS: вам может понадобиться импортировать spark-avro
libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"
Надеюсь, это помогло. ура
Вы можете использовать createDataFrame(rowRDD: RDD[Row], schema: StructType), который доступен в объекте SQLContext. Пример для преобразования RDD старого DataFrame:
import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
Обратите внимание, что нет необходимости явно устанавливать любой столбец схемы. Мы повторно используем старую схему DF, которая имеет класс StructType и может быть легко расширена. Однако такой подход иногда невозможен, а в некоторых случаях может быть менее эффективным, чем первый.