Как отобразить строки в сгенерированный protobuf класс?
Мне нужно написать задание, которое читает DataSet[Row] и преобразует его в DataSet[CustomClass], где CustomClass является классом protobuf.
val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
case Row(f1: String, f2: Long ) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}(protoEncoder)
Тем не менее, похоже, что классы Protobuf на самом деле не являются Java Beans, и я получаю NPE на следующем
val x = Encoders.bean(classOf[CustomClass])
Как можно обеспечить, чтобы задание могло генерировать набор данных типа DataSet[CustomClass], где CustomClass - это класс protobuf. Любые указатели / примеры написания пользовательского кодировщика для класса?
NPE:
val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
... 48 elided
Кодировщик Бина внутренне использует
JavaTypeInference.serializerFor(protoClass)
Если я пытаюсь сделать то же самое в моем пользовательском кодировщике, я получаю более описательное сообщение об ошибке:
Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
4 ответа
Мой опыт работы с энкодерами не очень многообещающий, и на данный момент я бы рекомендовал не тратить на это больше времени.
Я предпочел бы подумать об альтернативах и о том, как работать со Spark по-своему и отобразить результат вычислений Spark на сгенерированный protobuf класс на самом последнем этапе.
Для преобразования строки в класс Protobuf вы можете использовать sparksql-protobuf
Эта библиотека предоставляет утилиты для работы с объектами Protobuf в SparkSQL. Он обеспечивает способ чтения файла партера, записанного SparkSQL, обратно в качестве RDD совместимого объекта protobuf. Он также может конвертировать RDD объектов protobuf в DataFrame.
добавить зависимость к вашему build.sbt
файл
resolvers += Resolver.jcenterRepo
libraryDependencies ++= Seq(
"com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
"org.apache.parquet" % "parquet-protobuf" % "1.8.1"
)
Вы можете следовать некоторым примерам из библиотеки, чтобы начать
Надеюсь, это поможет!
То, как я это сделал: я использовал библиотеку saurfang sparksql-protobuf (код доступен на Github). Вы непосредственно получаете СДР [ProtoSchema], но его трудно преобразовать в набор данных [ProtoSchema]. Я использовал его, чтобы получить информацию для добавления в другой СДР, в основном с пользовательскими функциями.
1: импортировать библиотеку
С Maven:
<dependencies>
<dependency>
<groupId>com.github.saurfang</groupId>
<artifactId>sparksql-protobuf_2.10</artifactId>
<version>0.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>
...
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>bintray-saurfang-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/saurfang/maven</url>
</repository>
</repositories>
2: Считать данные как RDD [ProtoSchema]
val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])
(Необязательно) Добавьте PathFilter (Hadoop API)
Если вы хотите добавить класс PathFilter (как вы это делали в Hadoop) или активировать другие параметры, которые вы использовали в Hadoop, вы можете сделать следующее:
sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])
Но не забудьте очистить конфигурацию Hadoop, если вы хотите использовать SparkSession для чтения других вещей:
sess.sparkContext.hadoopConfiguration.clear()
Хотя это не строгий ответ, я все же нашел обходной путь. Кодеры не нужны, если мы используем RDD.
val rows =
spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
case Row(f1: String, f2: Long ) => {
val pbufClass = CustomClass.newBuilder()
.setF1(f1)
.setF2(f2)
pbufClass.build()}}
Это дает мне RDD класса Protobuf, с которым я могу работать.
Сериализация по умолчанию также не работает для моих объектов protobuf.
Однако оказывается, что внутренняя искра использует крио. Итак, если вы это сделаете
Encoders.kryo(ProtoBuffObject.class)
это сработало.