Структурированное потоковое вещание с использованием Spark 2.0.2, Kafka source и scalapb

Я использую структурированную потоковую передачу (Spark 2.0.2) для использования сообщений kafka. Использование scalapb, сообщения в protobuf. Я получаю следующую ошибку. Пожалуйста помоги..

Исключение в потоке "main" scala.ScalaReflectionException: не является термином в scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) в scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(символы.scala:84) в org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811) в org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39) в org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800) в org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39) в org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$ катализатор $ScalaReflection$$serializerFor(ScalaReflection.scala:582) в org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$ катализатор $ScalaReflection$$serializerFor(ScalaReflection.scala:460) в org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592) в org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583) в scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) в scala.collection.TraversableLike $ $ anonfun $ flatMap $ 1.apply (TraversableLike.scala: 252) в scala.collection.immutable.List.foreach(List.scala:381) в scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) в scala.collection.immutable.List.flatMap(List.scala:344) в org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$ катализатор $ScalaReflection$$serializerFor(ScalaReflection.scala:583) в org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425) в org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) в org.apache.spark.sql.Encoders$.product(Encoders.scala:274) в org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) в PersonConsumer$.main(PersonConsumer.scala:33) в PersonConsumer. Основной (PersonConsumer.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0(собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.Delegating.MethodAccessor javaljj.reflect.Method.invoke(Method.java:498) в com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Вот мой код...

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    def parseLine(s: String): Person =
      Person.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

    val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

    val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

    val ds4 = spark.sqlContext.sql("select name from persons")

    val query = ds4.writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

2 ответа

Линия с val ds3 должно быть:

val ds3 = ds2.map(str => parseLine(str))

sqlContext.protoToDataFrame(ds3).registerTempTable("persons")

СДР необходимо преобразовать во фрейм данных, прежде чем он будет сохранен как временная таблица.

В классе Person пол - это перечисление, и это стало причиной этой проблемы. После удаления этого поля все работает нормально. Вот ответ, который я получил от Shixiong(Райан) из DataBricks.

Проблема в том, что "необязательный пол = 3;". Сгенерированный класс "Gender" является признаком, и Spark не может знать, как создать признак, поэтому он не поддерживается. Вы можете определить свой класс, который поддерживается SQL Encoder, и преобразовать этот сгенерированный класс в новый класс в parseLine,

Другие вопросы по тегам