Вывод нескольких столбцов в UDAF Spark
Я получаю некоторые данные от моего mongodb, который выглядит так:
+------+-------+
| view | data |
+------+-------+
| xx | *** |
| yy | *** |
| xx | *** |
+------+-------+
Нет необходимости знать, что внутри.
Я написал UserDefinedAggregateFunction, как это, потому что я хочу сгруппировать по представлению.:
class Extractor() extends UserDefinedAggregateFunction{
override def inputSchema: StructType = // some stuff
override def bufferSchema: StructType =
StructType(
List(
StructField("0",IntegerType,false),
StructField("1",IntegerType,false),
StructField("2",IntegerType,false),
StructField("3",IntegerType,false),
StructField("4",IntegerType,false),
StructField("5",IntegerType,false),
StructField("6",IntegerType,false),
StructField("7",IntegerType,false)
)
)
override def dataType: DataType = bufferSchema
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
for (x <- 0 to 7){
buffer(x) = 0
}
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = // some stuff
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = // some stuff
override def evaluate(buffer: Row): Any =
var l = List.empty[Integer]
for (x <- 7 to 0 by -1){
l = buffer.getInt(x) :: l
}
l
}
Мой вывод должен быть примерно таким:
+------+---+---+---+---+---+---+---+---+
| view | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+------+---+---+---+---+---+---+---+---+
| xx | 0 | 0 | 4 | 1 | 0 | 0 | 3 | 0 |
| yy | 0 | 0 | 0 | 3 | 0 | 1 | 0 | 0 |
+------+---+---+---+---+---+---+---+---+
Значения рассчитываются в приведенной выше функции обновления / слияния, но это работает, и вам не нужно показывать это.
Тогда я использую это так:
val ex = new Extractor()
val df = dataset.groupBy("view").agg(
ex(dataset.col("data"))
)
df.show()
Когда я выполняю df.show(), он всегда дает мне исключение IndexOutOfBoundException. Я знаю, что это ленивая оценка, поэтому я получаю сообщение об ошибке в df.show().
Насколько я вижу, он может выполнить первую группу и завершить функцию оценки. Но после этого я получаю IndexOutOfBoundException...
Также, когда я изменяю dataType и оцениваю функцию на:
override def dataType: DataType =
ArrayType(IntegerType,false)
override def evaluate(buffer: Row): Any = {
var l = ofDim[Integer](8)
for (x <- 0 to 7){
l(x) = buffer.getInt(x)
}
l
Вывод будет выглядеть так:
+------+------------------------------+
| view | Extractor |
+------+------------------------------+
| xx | [0, 0, 4, 1, 0, 0, 3, 0] |
| yy | [0, 0, 0, 3, 0, 1, 0, 0] |
+------+------------------------------+
И схема выглядит так:
root
|-- view: string (nullable = true)
|-- Extractor: array (nullable = true)
| |-- element: integer (containsNull = false)
И я не смог преобразовать это в форму, которую я хочу.
Поскольку второй подход работает, я думаю, что в первом подходе я что-то связываю с DataType, но я не понимаю, как я могу это исправить...
Многие введения, поэтому на мой вопрос:
Как я могу получить желаемый результат? Мне все равно, какой из двух подходов (сначала с несколькими выходными столбцами или массивом, который может преобразовываться в нужную форму), пока он эффективен.
Спасибо за помощь
1 ответ
Вы определяете агрегированный вывод как список:
override def dataType: DataType = bufferSchema
Так как bufferSchema
Список, это то, что вы получите в конце. Позже вы можете изменить свою схему и преобразовать каждый столбец из вашего списка в новый столбец.
По вашей ошибке разница между:
override def evaluate(buffer: Row): Any =
var l = List.empty[Integer]
for (x <- 7 to 0 by -1){
l = buffer.getInt(x) :: l
}
l
а также
override def evaluate(buffer: Row): Any =
var l = ofDim[Integer](8)
for (x <- 0 to 7){
l = buffer.getInt(x) :: l
}
l
в том, что во втором вы определяете заранее определенное количество столбцов. Таким образом, вы уверены, что вы можете перебирать от 0 до 7 без проблем.
Это не относится к первому примеру, поэтому я подозреваю, что у вас могут быть неверно отформатированные данные, из-за которых ваш буфер будет неправильно инициализирован в initialize
или же merge
, Я предлагаю вам добавить try/catch для проверки размера после каждого шага, который преобразует длину вашего буфера (по крайней мере, initialize
но может быть update
или же merge
тоже).
Чтобы добавить столбец для каждого элемента из вашего списка, вы можете использовать withColumn
или сделать это через карту.