Как отобразить строки в сгенерированный protobuf класс?

Мне нужно написать задание, которое читает DataSet[Row] и преобразует его в DataSet[CustomClass], где CustomClass является классом protobuf.

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

Тем не менее, похоже, что классы Protobuf на самом деле не являются Java Beans, и я получаю NPE на следующем

val x =  Encoders.bean(classOf[CustomClass])

Как можно обеспечить, чтобы задание могло генерировать набор данных типа DataSet[CustomClass], где CustomClass - это класс protobuf. Любые указатели / примеры написания пользовательского кодировщика для класса?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

Кодировщик Бина внутренне использует

JavaTypeInference.serializerFor(protoClass)

Если я пытаюсь сделать то же самое в моем пользовательском кодировщике, я получаю более описательное сообщение об ошибке:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)

4 ответа

Мой опыт работы с энкодерами не очень многообещающий, и на данный момент я бы рекомендовал не тратить на это больше времени.

Я предпочел бы подумать об альтернативах и о том, как работать со Spark по-своему и отобразить результат вычислений Spark на сгенерированный protobuf класс на самом последнем этапе.

Для преобразования строки в класс Protobuf вы можете использовать sparksql-protobuf

Эта библиотека предоставляет утилиты для работы с объектами Protobuf в SparkSQL. Он обеспечивает способ чтения файла партера, записанного SparkSQL, обратно в качестве RDD совместимого объекта protobuf. Он также может конвертировать RDD объектов protobuf в DataFrame.

добавить зависимость к вашему build.sbt файл

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"

)

Вы можете следовать некоторым примерам из библиотеки, чтобы начать

Пример 1

Пример 2

Надеюсь, это поможет!

То, как я это сделал: я использовал библиотеку saurfang sparksql-protobuf (код доступен на Github). Вы непосредственно получаете СДР [ProtoSchema], но его трудно преобразовать в набор данных [ProtoSchema]. Я использовал его, чтобы получить информацию для добавления в другой СДР, в основном с пользовательскими функциями.

1: импортировать библиотеку

С Maven:

<dependencies>
    <dependency>
        <groupId>com.github.saurfang</groupId>
        <artifactId>sparksql-protobuf_2.10</artifactId>
        <version>0.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>
...

<repositories>
    <repository>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <id>bintray-saurfang-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/saurfang/maven</url>
    </repository>
</repositories>

2: Считать данные как RDD [ProtoSchema]

val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])

(Необязательно) Добавьте PathFilter (Hadoop API)

Если вы хотите добавить класс PathFilter (как вы это делали в Hadoop) или активировать другие параметры, которые вы использовали в Hadoop, вы можете сделать следующее:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])

Но не забудьте очистить конфигурацию Hadoop, если вы хотите использовать SparkSession для чтения других вещей:

sess.sparkContext.hadoopConfiguration.clear()

Хотя это не строгий ответ, я все же нашел обходной путь. Кодеры не нужны, если мы используем RDD.

val rows =
      spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}

Это дает мне RDD класса Protobuf, с которым я могу работать.

Сериализация по умолчанию также не работает для моих объектов protobuf.

Однако оказывается, что внутренняя искра использует крио. Итак, если вы это сделаете

Encoders.kryo(ProtoBuffObject.class)

это сработало.