Как преобразовать RDD[GenericRecord] в фрейм данных в Scala?

Я получаю твиты из темы кафки с Avro (сериализатор и десериализатор). Затем я создаю искровой потребитель, который извлекает твиты в Dstream of RDD[GenericRecord]. Теперь я хочу преобразовать каждый rdd в фрейм данных для анализа этих твитов с помощью SQL. Любое решение для преобразования RDD [GenericRecord] в dataframe, пожалуйста?

4 ответа

Решение

Я потратил некоторое время, пытаясь заставить эту работу (особенно, как правильно десериализовать данные, но, похоже, вы уже рассмотрели это)... ОБНОВЛЕНО

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

Как вы видите, я использую SchemaConverter, чтобы получить структуру dataframe из схемы, которую вы использовали для десериализации (это может быть более болезненным для реестра схемы). Для этого вам нужна следующая зависимость

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

вам нужно будет изменить свою версию искры в зависимости от вашей.

ОБНОВЛЕНИЕ: приведенный выше код работает только для плоских авро-схем.

Для вложенных структур я использовал что-то другое. Вы можете скопировать класс SchemaConverters, он должен быть внутри com.databricks.spark.avro (он использует некоторые защищенные классы из пакета databricks) или вы можете попробовать использовать зависимость spark-bigquery. Класс не будет доступен по умолчанию, поэтому вам нужно будет создать класс внутри пакета com.databricks.spark.avro чтобы получить доступ к заводскому методу.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

После этого вы сможете преобразовать данные как

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)

Комбинация /questions/43417145/kak-konvertirovat-vlozhennyij-avro-genericrecord-v-stroku/43417152#43417152 и /questions/28588914/kak-preobrazovat-rddgenericrecord-v-frejm-dannyih-v-scala/28588924#28588924 работает для меня.

Я использовал следующее для создания MySchemaConversions

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

А потом я использовал

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

// unionedResultRdd is unionRDD[GenericRecord]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

Преимущество наличия myConverter в объекте MyObject заключается в том, что вы не столкнетесь с проблемами сериализации (java.io.NotSerializableException).

object MyObject{
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)
}

Хотя что-то вроде этого может помочь вам,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
})

Я хотел бы предложить вам альтернативный подход. С Spark 2.x вы можете пропустить весь процесс создания DStreams, Вместо этого вы можете сделать что-то подобное со структурированной потоковой передачей,

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

Это даст вам один фрейм данных, который вы можете запросить напрямую. Вот, ss является примером искровой сессии. /path/to/files это место, где все ваши avro файлы выгружаются из кафки.

PS: вам может понадобиться импортировать spark-avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

Надеюсь, это помогло. ура

Вы можете использовать createDataFrame(rowRDD: RDD[Row], schema: StructType), который доступен в объекте SQLContext. Пример для преобразования RDD старого DataFrame:

import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Обратите внимание, что нет необходимости явно устанавливать любой столбец схемы. Мы повторно используем старую схему DF, которая имеет класс StructType и может быть легко расширена. Однако такой подход иногда невозможен, а в некоторых случаях может быть менее эффективным, чем первый.

Другие вопросы по тегам