Как создать кодировщик для коллекции Scala (для реализации собственного Aggregator)?
Spark 2.3.0 с Scala 2.11. Я реализую кастом Aggregator
в соответствии с документами здесь. Агрегатору требуется 3 типа для ввода, буфера и вывода.
Мой агрегатор должен работать со всеми предыдущими строками в окне, поэтому я объявил это так:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
Предполагается, что один из методов переопределения возвращает кодировщик для типа буфера, который в данном случае является ListBuffer
, Я не могу найти подходящий кодировщик для org.apache.spark.sql.Encoders
и никакой другой способ закодировать это, так что я не знаю, что сюда вернуть.
Я думал о создании нового класса case, который имеет единственное свойство типа ListBuffer[Foo]
и используя это как мой буферный класс, а затем используя Encoders.product
на этом, но я не уверен, если это необходимо или есть что-то еще, что я пропускаю. Спасибо за любые советы.
2 ответа
Вы должны просто позволить Spark SQL выполнить свою работу и найти подходящий кодер, используя ExpressionEncoder
следующее:
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
Я не вижу ничего в org.apache.spark.sql.Encoders, который можно было бы использовать для непосредственного кодирования ListBuffer или даже в этом случае List.
Один вариант, кажется, идет с помещением его в класс case, как вы предложили:
import org.apache.spark.sql.Encoders
case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]
Другим вариантом может быть использование крио:
Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]
Или, наконец, вы можете взглянуть на ExpressionEncoders, который расширяет Encoder:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]
Это лучшее решение, поскольку оно сохраняет все прозрачным для катализатора и, следовательно, позволяет ему выполнять все свои замечательные оптимизации.
Во время игры я заметил одну вещь:
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema
Я не проверял ничего из вышеперечисленного во время выполнения агрегации, поэтому могут быть проблемы во время выполнения. Надеюсь, это полезно.