Как хранить пользовательские объекты в наборе данных?

В соответствии с введением наборов данных Spark:

В преддверии Spark 2.0 мы планируем несколько интересных улучшений в наборах данных, в частности: ... Пользовательские кодировщики - в то время как в настоящее время мы автоматически генерируем кодировщики для широкого спектра типов, мы хотели бы открыть API для пользовательских объектов.

и пытается сохранить пользовательский тип в Dataset привести к следующей ошибке вроде:

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы Product (классы дел) поддерживаются за счет импорта sqlContext.implicits._ В следующих выпусках будет добавлена ​​поддержка сериализации других типов.

или же:

Java.lang.UnsupportedOperationException: не найден кодировщик для....

Существуют ли обходные пути?


Обратите внимание, что этот вопрос существует только в качестве отправной точки для ответа сообщества Wiki. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.

8 ответов

Решение

Обновить

Этот ответ по-прежнему действителен и информативен, хотя с 2.2/2.3 теперь все лучше, в него добавлена ​​поддержка встроенного кодера для Set, Seq, Map, Date, Timestamp, а также BigDecimal, Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, у вас должно получиться только неявное в SQLImplicits,


К сожалению, практически ничего не было добавлено, чтобы помочь с этим. В поисках @since 2.0.0 в Encoders.scala или же SQLImplicits.scala находит вещи, в основном связанные с примитивными типами (и некоторые настройки классов case). Итак, первое, что нужно сказать: в настоящее время нет действительно хорошей поддержки пользовательских кодировщиков классов. После всего этого последуют некоторые уловки, которые делают такую ​​работу, на которую мы можем надеяться, учитывая то, что мы имеем в нашем распоряжении. Как предварительный отказ от ответственности: это не будет работать идеально, и я сделаю все возможное, чтобы все ограничения были ясными и предварительными.

В чем именно проблема

Когда вы хотите создать набор данных, Spark "требуется кодировщик (для преобразования объекта JVM типа T во внутреннее представление Spark SQL и из него), который обычно создается автоматически через последствия от SparkSession или может быть создан явно путем вызова статических методов Encoders " (взято из документов на createDataset). Кодировщик примет форму Encoder[T] где T тип, который вы кодируете Первое предложение добавить import spark.implicits._ (который дает вам эти неявные кодировщики), и второе предложение заключается в явной передаче неявного кодировщика с использованием этого набора функций, связанных с кодировщиком.

Для обычных классов кодер недоступен, поэтому

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

даст вам следующую неявную связанную ошибку времени компиляции:

Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы Product (классы дел) поддерживаются за счет импорта sqlContext.implicits._ В следующих выпусках будет добавлена ​​поддержка сериализации других типов.

Тем не менее, если вы оберните любой тип, который вы только что использовали, чтобы получить вышеупомянутую ошибку в некотором классе, который расширяет Product ошибка ошибочно задерживается во время выполнения, поэтому

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Компилируется просто отлично, но не работает во время выполнения с

java.lang.UnsupportedOperationException: не найден кодировщик для MyObj

Причиной этого является то, что кодировщики, которые Spark создает с импликациями, фактически создаются только во время выполнения (с помощью scala relfection). В этом случае все проверки Spark во время компиляции заключаются в том, что внешний класс расширяется. Product (что делают все классы case) и понимает только во время выполнения, что он все еще не знает, что делать с MyObj (та же проблема возникает, если я попытался сделать Dataset[(Int,MyObj)] - Spark ждет, пока не закончится время работы MyObj). Это основные проблемы, которые крайне необходимо исправить:

  • некоторые классы, которые расширяются Product компилировать, несмотря на то, что всегда происходит сбой во время выполнения и
  • нет способа передать пользовательские кодировщики для вложенных типов (у меня нет способа передать Spark кодировщик просто MyObj такой, что он тогда знает, как кодировать Wrap[MyObj] или же (Int,MyObj)).

Просто используйте kryo

Решение, которое все предлагают, состоит в том, чтобы использовать kryo кодировщик.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Это становится довольно утомительным быстро, хотя. Особенно, если ваш код манипулирует всеми видами наборов данных, объединяет, группирует и т. Д. В конечном итоге вы получаете кучу дополнительных последствий. Итак, почему бы просто не сделать неявное, которое делает все это автоматически?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

И теперь, похоже, я могу сделать почти все, что захочу (пример ниже не будет работать в spark-shell где spark.implicits._ автоматически импортируется)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Или почти. Проблема в том, что с помощью kryo приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. За map, filter, foreach этого достаточно, но для таких операций, как join Spark действительно нужно разделить на столбцы. Проверка схемы на d2 или же d3 Вы видите, что есть только один двоичный столбец:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Частичное решение для кортежей

Таким образом, используя магию имплицитов в Scala (подробнее в 6.26.3 Разрешение перегрузки), я могу создать серию имплицитов, которые будут работать как можно лучше, по крайней мере, для кортежей, и будут хорошо работать с существующими имплицитами:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Затем, вооружившись этими последствиями, я могу заставить свой пример, приведенный выше, работать, хотя и с некоторым переименованием столбцов.

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Я еще не понял, как получить ожидаемые имена кортежей (_1, _2,...) по умолчанию без переименования их - если кто-то хочет поиграть с этим, это где имя "value" вводится, и именно здесь обычно добавляются имена кортежей. Тем не менее, ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Итак, в заключение, это обходной путь:

  • позволяет нам получать отдельные столбцы для кортежей (так что мы можем снова присоединиться к кортежам, ура!)
  • мы снова можем полагаться на последствия (так что нет необходимости проходить через kryo повсюду)
  • почти полностью обратно совместим с import spark.implicits._ (с некоторым переименованием)
  • не позволяет нам присоединиться к kyro сериализованные двоичные столбцы, не говоря уже о полях, которые могут иметь
  • имеет неприятный побочный эффект переименования некоторых столбцов кортежа в "значение" (при необходимости это можно отменить, преобразовав .toDF указание новых имен столбцов и преобразование обратно в набор данных - и имена схем, похоже, сохраняются через объединения, где они наиболее необходимы).

Частичное решение для классов в целом

Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение для кортежей, описанное выше, я догадываюсь, что решение о неявном преобразовании из другого ответа тоже будет немного менее болезненным, поскольку вы можете преобразовывать свои более сложные классы в кортежи. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход с фреймом данных. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях своих классов. Если бы я только использовал один плоский двоичный файл kryo Сериализатор, который не был бы возможен.

Вот пример, который делает всего понемногу: у меня есть класс MyObj который имеет поля типов Int, java.util.UUID, а также Set[String], Первый заботится о себе. Второй, хотя я мог бы сериализовать с помощью kryo будет более полезным, если хранится как String (поскольку UUID я, как правило, хочу присоединиться) Третий действительно принадлежит двоичному столбцу.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Теперь я могу создать набор данных с хорошей схемой, используя этот механизм:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

И схема показывает мне столбцы с правильными именами и с двумя первыми вещами, с которыми я могу объединиться.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
  1. Использование универсальных кодировщиков.

    На данный момент доступны два универсальных кодировщика kryo а также javaSerialization где последний явно описывается как:

    крайне неэффективно и должно использоваться только в качестве крайней меры.

    Предполагая следующий класс

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    Вы можете использовать эти кодировщики, добавив неявный кодировщик:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    которые можно использовать вместе следующим образом:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    Он хранит объекты как binary столбец, поэтому при преобразовании в DataFrame Вы получаете следующую схему:

    root
     |-- value: binary (nullable = true)
    

    Также возможно кодировать кортежи, используя kryo кодировщик для конкретного поля:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Обратите внимание, что здесь мы не зависим от неявных кодеров, но передаем кодировщик явно, так что это, скорее всего, не будет работать с toDS метод.

  2. Использование неявных преобразований:

    Обеспечьте неявные преобразования между представлением, которое может быть закодировано, и пользовательским классом, например:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Смежные вопросы:

Вы можете использовать UDTRegistration, а затем Case Classes, Tuples и т. Д. - все работает правильно с вашим Определяемым пользователем типом!

Скажем, вы хотите использовать пользовательский Enum:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Зарегистрируйте это так:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Тогда используйте его!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Скажем, вы хотите использовать Полиморфную запись:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... и использовать это так:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Вы можете написать пользовательский UDT, который кодирует все в байты (здесь я использую сериализацию Java, но, вероятно, лучше использовать инструмент Spark в контексте Kryo).

Сначала определите класс UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Тогда зарегистрируйте это:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Тогда вы можете использовать это!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Кодеры работают более или менее одинаково в Spark2.0, А также Kryo все еще рекомендуется serialization выбор.

Вы можете посмотреть на следующий пример с spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

До сих пор] не было appropriate encoders в нынешнем объеме, чтобы наши лица не были закодированы как binary ценности. Но это изменится, как только мы предоставим implicit кодеры, использующие Kryo сериализации.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

В случае класса Java Bean это может быть полезно

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Теперь вы можете просто прочитать dataFrame как пользовательский DataFrame

dataFrame.as[MyClass]

Это создаст пользовательский кодер класса, а не двоичный.

Мои примеры будут на Java, но я не думаю, что это будет сложно адаптироваться к Scala.

Я был довольно успешным преобразованием RDD<Fruit> в Dataset<Fruit> используя spark.createDataset и Encoders.bean до тех пор, пока Fruit простой Java-бин

Шаг 1: Создайте простой Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Я бы придерживался классов с примитивными типами и String как полей, прежде чем ребята из DataBricks усилили свои кодировщики. Если у вас есть класс с вложенным объектом, создайте еще один простой Java Bean со всеми его полями, чтобы вы могли использовать преобразования RDD для сопоставления сложного типа с более простым. Конечно, это небольшая дополнительная работа, но я думаю, что это сильно поможет производительности при работе с плоской схемой.

Шаг 2: Получите ваш набор данных от RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

И вуаля! Вспенить, промыть, повторить.

Для тех, кто может в моей ситуации, я тоже выложу свой ответ.

Чтобы быть конкретным,

  1. Я читал "Установить типизированные данные" из SQLContext. Таким образом, оригинальный формат данных - DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Затем преобразуйте его в RDD, используя rdd.map() с типом mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Результат:

    (1,Set(1))

@ Ответ Алека великолепен! Просто чтобы добавить комментарий в этой части своего ответа:

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

@Alec упоминает:

нет возможности передать пользовательские кодировщики для вложенных типов (у меня нет возможности передать Spark кодировщик только для MyObj, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj)).

Кажется так, потому что если я добавлю кодировщик для MyObj:

implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

, он по-прежнему не работает:

java.lang.UnsupportedOperationException: No Encoder found for MyObj
- field (class: "MyObj", name: "unwrap")
- root class: "Wrap"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)

Но обратите внимание на важное сообщение об ошибке:

корневой класс: "Wrap"

На самом деле это подсказка, что кодирование MyObjнедостаточно, и вам нужно закодировать всю цепочку, включая Wrap[T].

Итак, если я сделаю это, это решит проблему:

implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]

Следовательно, комментарий @Alec НЕ соответствует действительности:

У меня нет возможности передать Spark кодировщик только для MyObj, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj)

У нас все еще есть способ передать Spark кодировщик для MyObj таким образом, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj).

В дополнение к уже высказанным предложениям, недавно я обнаружил еще один вариант, который заключается в том, что вы можете объявить свой пользовательский класс, включая признак org.apache.spark.sql.catalyst.DefinedByConstructorParams,

Это работает, если у класса есть конструктор, который использует типы, которые ExpressionEncoder может понять, то есть примитивные значения и стандартные коллекции. Это может пригодиться, когда вы не можете объявить класс в качестве класса case, но не хотите использовать Kryo для его кодирования каждый раз, когда он включается в набор данных.

Например, я хотел объявить класс case, включающий вектор Breeze. Единственный кодировщик, который сможет обработать это, как правило, Kryo. Но если бы я объявил подкласс, который расширил Breeze DenseVector и DefinedByConstructorParams, ExpressionEncoder понял, что его можно сериализовать как массив Double.

Вот как я это объявил:

class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]

Теперь я могу использовать SerializableDenseVector в наборе данных (напрямую или в составе продукта) с использованием простого ExpressionEncoder и без Kryo. Он работает так же, как Breeze DenseVector, но сериализуется как массив [Double].

Другие вопросы по тегам