Как создать кодировщик для коллекции Scala (для реализации собственного Aggregator)?

Spark 2.3.0 с Scala 2.11. Я реализую кастом Aggregator в соответствии с документами здесь. Агрегатору требуется 3 типа для ввода, буфера и вывода.

Мой агрегатор должен работать со всеми предыдущими строками в окне, поэтому я объявил это так:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

Предполагается, что один из методов переопределения возвращает кодировщик для типа буфера, который в данном случае является ListBuffer, Я не могу найти подходящий кодировщик для org.apache.spark.sql.Encoders и никакой другой способ закодировать это, так что я не знаю, что сюда вернуть.

Я думал о создании нового класса case, который имеет единственное свойство типа ListBuffer[Foo] и используя это как мой буферный класс, а затем используя Encoders.product на этом, но я не уверен, если это необходимо или есть что-то еще, что я пропускаю. Спасибо за любые советы.

2 ответа

Решение

Вы должны просто позволить Spark SQL выполнить свою работу и найти подходящий кодер, используя ExpressionEncoder следующее:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]

Я не вижу ничего в org.apache.spark.sql.Encoders, который можно было бы использовать для непосредственного кодирования ListBuffer или даже в этом случае List.

Один вариант, кажется, идет с помещением его в класс case, как вы предложили:

import org.apache.spark.sql.Encoders

case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]

Другим вариантом может быть использование крио:

Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]

Или, наконец, вы можете взглянуть на ExpressionEncoders, который расширяет Encoder:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]

Это лучшее решение, поскольку оно сохраняет все прозрачным для катализатора и, следовательно, позволяет ему выполнять все свои замечательные оптимизации.

Во время игры я заметил одну вещь:

ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema

Я не проверял ничего из вышеперечисленного во время выполнения агрегации, поэтому могут быть проблемы во время выполнения. Надеюсь, это полезно.

Другие вопросы по тегам