Как хранить пользовательские объекты в наборе данных?
В соответствии с введением наборов данных Spark:
В преддверии Spark 2.0 мы планируем несколько интересных улучшений в наборах данных, в частности: ... Пользовательские кодировщики - в то время как в настоящее время мы автоматически генерируем кодировщики для широкого спектра типов, мы хотели бы открыть API для пользовательских объектов.
и пытается сохранить пользовательский тип в Dataset
привести к следующей ошибке вроде:
Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы Product (классы дел) поддерживаются за счет импорта sqlContext.implicits._ В следующих выпусках будет добавлена поддержка сериализации других типов.
или же:
Java.lang.UnsupportedOperationException: не найден кодировщик для....
Существуют ли обходные пути?
Обратите внимание, что этот вопрос существует только в качестве отправной точки для ответа сообщества Wiki. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.
8 ответов
Обновить
Этот ответ по-прежнему действителен и информативен, хотя с 2.2/2.3 теперь все лучше, в него добавлена поддержка встроенного кодера для Set
, Seq
, Map
, Date
, Timestamp
, а также BigDecimal
, Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, у вас должно получиться только неявное в SQLImplicits
,
К сожалению, практически ничего не было добавлено, чтобы помочь с этим. В поисках @since 2.0.0
в Encoders.scala
или же SQLImplicits.scala
находит вещи, в основном связанные с примитивными типами (и некоторые настройки классов case). Итак, первое, что нужно сказать: в настоящее время нет действительно хорошей поддержки пользовательских кодировщиков классов. После всего этого последуют некоторые уловки, которые делают такую работу, на которую мы можем надеяться, учитывая то, что мы имеем в нашем распоряжении. Как предварительный отказ от ответственности: это не будет работать идеально, и я сделаю все возможное, чтобы все ограничения были ясными и предварительными.
В чем именно проблема
Когда вы хотите создать набор данных, Spark "требуется кодировщик (для преобразования объекта JVM типа T во внутреннее представление Spark SQL и из него), который обычно создается автоматически через последствия от SparkSession
или может быть создан явно путем вызова статических методов Encoders
" (взято из документов на createDataset
). Кодировщик примет форму Encoder[T]
где T
тип, который вы кодируете Первое предложение добавить import spark.implicits._
(который дает вам эти неявные кодировщики), и второе предложение заключается в явной передаче неявного кодировщика с использованием этого набора функций, связанных с кодировщиком.
Для обычных классов кодер недоступен, поэтому
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
даст вам следующую неявную связанную ошибку времени компиляции:
Невозможно найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы Product (классы дел) поддерживаются за счет импорта sqlContext.implicits._ В следующих выпусках будет добавлена поддержка сериализации других типов.
Тем не менее, если вы оберните любой тип, который вы только что использовали, чтобы получить вышеупомянутую ошибку в некотором классе, который расширяет Product
ошибка ошибочно задерживается во время выполнения, поэтому
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Компилируется просто отлично, но не работает во время выполнения с
java.lang.UnsupportedOperationException: не найден кодировщик для MyObj
Причиной этого является то, что кодировщики, которые Spark создает с импликациями, фактически создаются только во время выполнения (с помощью scala relfection). В этом случае все проверки Spark во время компиляции заключаются в том, что внешний класс расширяется. Product
(что делают все классы case) и понимает только во время выполнения, что он все еще не знает, что делать с MyObj
(та же проблема возникает, если я попытался сделать Dataset[(Int,MyObj)]
- Spark ждет, пока не закончится время работы MyObj
). Это основные проблемы, которые крайне необходимо исправить:
- некоторые классы, которые расширяются
Product
компилировать, несмотря на то, что всегда происходит сбой во время выполнения и - нет способа передать пользовательские кодировщики для вложенных типов (у меня нет способа передать Spark кодировщик просто
MyObj
такой, что он тогда знает, как кодироватьWrap[MyObj]
или же(Int,MyObj)
).
Просто используйте kryo
Решение, которое все предлагают, состоит в том, чтобы использовать kryo
кодировщик.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Это становится довольно утомительным быстро, хотя. Особенно, если ваш код манипулирует всеми видами наборов данных, объединяет, группирует и т. Д. В конечном итоге вы получаете кучу дополнительных последствий. Итак, почему бы просто не сделать неявное, которое делает все это автоматически?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
И теперь, похоже, я могу сделать почти все, что захочу (пример ниже не будет работать в spark-shell
где spark.implicits._
автоматически импортируется)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Или почти. Проблема в том, что с помощью kryo
приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. За map
, filter
, foreach
этого достаточно, но для таких операций, как join
Spark действительно нужно разделить на столбцы. Проверка схемы на d2
или же d3
Вы видите, что есть только один двоичный столбец:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Частичное решение для кортежей
Таким образом, используя магию имплицитов в Scala (подробнее в 6.26.3 Разрешение перегрузки), я могу создать серию имплицитов, которые будут работать как можно лучше, по крайней мере, для кортежей, и будут хорошо работать с существующими имплицитами:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Затем, вооружившись этими последствиями, я могу заставить свой пример, приведенный выше, работать, хотя и с некоторым переименованием столбцов.
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Я еще не понял, как получить ожидаемые имена кортежей (_1
, _2
,...) по умолчанию без переименования их - если кто-то хочет поиграть с этим, это где имя "value"
вводится, и именно здесь обычно добавляются имена кортежей. Тем не менее, ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Итак, в заключение, это обходной путь:
- позволяет нам получать отдельные столбцы для кортежей (так что мы можем снова присоединиться к кортежам, ура!)
- мы снова можем полагаться на последствия (так что нет необходимости проходить через
kryo
повсюду) - почти полностью обратно совместим с
import spark.implicits._
(с некоторым переименованием) - не позволяет нам присоединиться к
kyro
сериализованные двоичные столбцы, не говоря уже о полях, которые могут иметь - имеет неприятный побочный эффект переименования некоторых столбцов кортежа в "значение" (при необходимости это можно отменить, преобразовав
.toDF
указание новых имен столбцов и преобразование обратно в набор данных - и имена схем, похоже, сохраняются через объединения, где они наиболее необходимы).
Частичное решение для классов в целом
Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение для кортежей, описанное выше, я догадываюсь, что решение о неявном преобразовании из другого ответа тоже будет немного менее болезненным, поскольку вы можете преобразовывать свои более сложные классы в кортежи. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход с фреймом данных. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях своих классов. Если бы я только использовал один плоский двоичный файл kryo
Сериализатор, который не был бы возможен.
Вот пример, который делает всего понемногу: у меня есть класс MyObj
который имеет поля типов Int
, java.util.UUID
, а также Set[String]
, Первый заботится о себе. Второй, хотя я мог бы сериализовать с помощью kryo
будет более полезным, если хранится как String
(поскольку UUID
я, как правило, хочу присоединиться) Третий действительно принадлежит двоичному столбцу.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Теперь я могу создать набор данных с хорошей схемой, используя этот механизм:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
И схема показывает мне столбцы с правильными именами и с двумя первыми вещами, с которыми я могу объединиться.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
Использование универсальных кодировщиков.
На данный момент доступны два универсальных кодировщика
kryo
а такжеjavaSerialization
где последний явно описывается как:крайне неэффективно и должно использоваться только в качестве крайней меры.
Предполагая следующий класс
class Bar(i: Int) { override def toString = s"bar $i" def bar = i }
Вы можете использовать эти кодировщики, добавив неявный кодировщик:
object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] }
которые можно использовать вместе следующим образом:
object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } }
Он хранит объекты как
binary
столбец, поэтому при преобразовании вDataFrame
Вы получаете следующую схему:root |-- value: binary (nullable = true)
Также возможно кодировать кортежи, используя
kryo
кодировщик для конкретного поля:val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
Обратите внимание, что здесь мы не зависим от неявных кодеров, но передаем кодировщик явно, так что это, скорее всего, не будет работать с
toDS
метод.Использование неявных преобразований:
Обеспечьте неявные преобразования между представлением, которое может быть закодировано, и пользовательским классом, например:
object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } }
Смежные вопросы:
Вы можете использовать UDTRegistration, а затем Case Classes, Tuples и т. Д. - все работает правильно с вашим Определяемым пользователем типом!
Скажем, вы хотите использовать пользовательский Enum:
trait CustomEnum { def value:String }
case object Foo extends CustomEnum { val value = "F" }
case object Bar extends CustomEnum { val value = "B" }
object CustomEnum {
def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}
Зарегистрируйте это так:
// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
override def sqlType: DataType = org.apache.spark.sql.types.StringType
override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
// Note that this will be a UTF8String type
override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}
// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
Тогда используйте его!
case class UsingCustomEnum(id:Int, en:CustomEnum)
val seq = Seq(
UsingCustomEnum(1, Foo),
UsingCustomEnum(2, Bar),
UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())
Скажем, вы хотите использовать Полиморфную запись:
trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly
... и использовать это так:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Вы можете написать пользовательский UDT, который кодирует все в байты (здесь я использую сериализацию Java, но, вероятно, лучше использовать инструмент Spark в контексте Kryo).
Сначала определите класс UDT:
class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()
override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
bos.toByteArray
}
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}
override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}
Тогда зарегистрируйте это:
// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
Тогда вы можете использовать это!
// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Кодеры работают более или менее одинаково в Spark2.0
, А также Kryo
все еще рекомендуется serialization
выбор.
Вы можете посмотреть на следующий пример с spark-shell
scala> import spark.implicits._
import spark.implicits._
scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders
scala> case class NormalPerson(name: String, age: Int) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class NormalPerson
scala> case class ReversePerson(name: Int, age: String) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class ReversePerson
scala> val normalPersons = Seq(
| NormalPerson("Superman", 25),
| NormalPerson("Spiderman", 17),
| NormalPerson("Ironman", 29)
| )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))
scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds1.show()
+---------+---+
| name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
| Ironman| 29|
+---------+---+
scala> ds2.show()
+----+---------+
|name| age|
+----+---------+
| 25| Superman|
| 17|Spiderman|
| 29| Ironman|
+----+---------+
scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
До сих пор] не было appropriate encoders
в нынешнем объеме, чтобы наши лица не были закодированы как binary
ценности. Но это изменится, как только мы предоставим implicit
кодеры, использующие Kryo
сериализации.
// Provide Encoders
scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]
scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]
// Ecoders will be used since they are now present in Scope
scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]
scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]
// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
scala> ds4.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
// Our instances still work as expected
scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.
scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
В случае класса Java Bean это может быть полезно
import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
Теперь вы можете просто прочитать dataFrame как пользовательский DataFrame
dataFrame.as[MyClass]
Это создаст пользовательский кодер класса, а не двоичный.
Мои примеры будут на Java, но я не думаю, что это будет сложно адаптироваться к Scala.
Я был довольно успешным преобразованием RDD<Fruit>
в Dataset<Fruit>
используя spark.createDataset и Encoders.bean до тех пор, пока Fruit
простой Java-бин
Шаг 1: Создайте простой Java Bean.
public class Fruit implements Serializable {
private String name = "default-fruit";
private String color = "default-color";
// AllArgsConstructor
public Fruit(String name, String color) {
this.name = name;
this.color = color;
}
// NoArgsConstructor
public Fruit() {
this("default-fruit", "default-color");
}
// ...create getters and setters for above fields
// you figure it out
}
Я бы придерживался классов с примитивными типами и String как полей, прежде чем ребята из DataBricks усилили свои кодировщики. Если у вас есть класс с вложенным объектом, создайте еще один простой Java Bean со всеми его полями, чтобы вы могли использовать преобразования RDD для сопоставления сложного типа с более простым. Конечно, это небольшая дополнительная работа, но я думаю, что это сильно поможет производительности при работе с плоской схемой.
Шаг 2: Получите ваш набор данных от RDD
SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();
List<Fruit> fruitList = ImmutableList.of(
new Fruit("apple", "red"),
new Fruit("orange", "orange"),
new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);
RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
И вуаля! Вспенить, промыть, повторить.
Для тех, кто может в моей ситуации, я тоже выложу свой ответ.
Чтобы быть конкретным,
Я читал "Установить типизированные данные" из SQLContext. Таким образом, оригинальный формат данных - DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
Затем преобразуйте его в RDD, используя rdd.map() с типом mutable.WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Результат:
(1,Set(1))
@ Ответ Алека великолепен! Просто чтобы добавить комментарий в этой части своего ответа:
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
@Alec упоминает:
нет возможности передать пользовательские кодировщики для вложенных типов (у меня нет возможности передать Spark кодировщик только для MyObj, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj)).
Кажется так, потому что если я добавлю кодировщик для
MyObj
:
implicit val myEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
, он по-прежнему не работает:
java.lang.UnsupportedOperationException: No Encoder found for MyObj
- field (class: "MyObj", name: "unwrap")
- root class: "Wrap"
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:643)
Но обратите внимание на важное сообщение об ошибке:
корневой класс: "Wrap"
На самом деле это подсказка, что кодирование
MyObj
недостаточно, и вам нужно закодировать всю цепочку, включая
Wrap[T]
.
Итак, если я сделаю это, это решит проблему:
implicit val myWrapperEncoder = org.apache.spark.sql.Encoders.kryo[Wrap[MyObj]]
Следовательно, комментарий @Alec НЕ соответствует действительности:
У меня нет возможности передать Spark кодировщик только для MyObj, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj)
У нас все еще есть способ передать Spark кодировщик для
MyObj
таким образом, чтобы он знал, как кодировать Wrap[MyObj] или (Int,MyObj).
В дополнение к уже высказанным предложениям, недавно я обнаружил еще один вариант, который заключается в том, что вы можете объявить свой пользовательский класс, включая признак org.apache.spark.sql.catalyst.DefinedByConstructorParams
,
Это работает, если у класса есть конструктор, который использует типы, которые ExpressionEncoder может понять, то есть примитивные значения и стандартные коллекции. Это может пригодиться, когда вы не можете объявить класс в качестве класса case, но не хотите использовать Kryo для его кодирования каждый раз, когда он включается в набор данных.
Например, я хотел объявить класс case, включающий вектор Breeze. Единственный кодировщик, который сможет обработать это, как правило, Kryo. Но если бы я объявил подкласс, который расширил Breeze DenseVector и DefinedByConstructorParams, ExpressionEncoder понял, что его можно сериализовать как массив Double.
Вот как я это объявил:
class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]
Теперь я могу использовать SerializableDenseVector
в наборе данных (напрямую или в составе продукта) с использованием простого ExpressionEncoder и без Kryo. Он работает так же, как Breeze DenseVector, но сериализуется как массив [Double].