Spark: "Невозможно использовать UnspecifiedFrame. Это должно быть преобразовано во время анализа. Пожалуйста, отправьте отчет об ошибке"
Spark 2.3.0 с Scala 2.11. Я пытаюсь написать собственный агрегатор и запустить его через оконную функцию для этих документов, но получаю ошибку в заголовке. Вот урезанный пример. Это написано как тест FunSuite.
Я знаю, что в сообщении об ошибке говорится о том, что нужно отправить отчет об ошибке, но это такой простой пример, взятый почти непосредственно из документации, и мне интересно, есть ли в моем коде что-то, что вызывает ошибку. Интересно, если использование типа коллекции в качестве буфера как-то не поддерживается или необычно. Я не нашел других вопросов о переполнении стека с этой ошибкой.
package com.foobar;
import com.holdenkarau.spark.testing._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.{Aggregator, Window}
import org.scalatest.FunSuite
import org.slf4j.LoggerFactory
import scala.collection.mutable.ListBuffer
case class Foo(ID:Long, RANK:Int)
class Example extends FunSuite with DataFrameSuiteBase {
test("example") {
val data = Seq(
Foo(5555, 1),
Foo(5555, 2),
Foo(8888, 1),
Foo(8888, 2)
)
import spark.implicits._
val df = sc.parallelize(data).toDF
val w = Window.partitionBy("ID").orderBy("RANK")
// The three types are: input, buffer, and output.
object AggregateFoos extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
override def zero: ListBuffer[Foo] = new ListBuffer[Foo]()
override def reduce(b: ListBuffer[Foo], a: Foo): ListBuffer[Foo] = {
b += a
}
// Merge two intermediate values
override def merge(b1: ListBuffer[Foo], b2: ListBuffer[Foo]): ListBuffer[Foo] = {
(b1 ++ b2).sortBy(b => b.RANK)
}
// Transform the output of the reduction
override def finish(reduction: ListBuffer[Foo]): Boolean = {
true // in real life there would be logic here
}
// Specifies the Encoder for the intermediate value type
override def bufferEncoder: Encoder[ListBuffer[Foo]] = {
ExpressionEncoder()
}
// Specifies the Encoder for the final output value type
override def outputEncoder: Encoder[Boolean] = {
ExpressionEncoder()
}
}
val agg = AggregateFoos.toColumn.name("agg")
df.select(df.col("*"), agg.over(w).as("agg")).show(false)
}
}
Вот сообщение об ошибке:
org.apache.spark.sql.AnalysisException: не удается разрешить '(PARTITION BY
ID
СОРТИРОВАТЬ ПОRANK
ASC NULLS FIRST unspecifiedframe$())'из-за несоответствия типов данных: невозможно использовать UnspecifiedFrame. Это должно было быть преобразовано во время анализа. Пожалуйста, отправьте отчет об ошибке.;;
Ниже приводится полное исключение.
org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `ID` ORDER BY `RANK` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.;;
'Aggregate [ID#2L, RANK#3, aggregatefoos(AggregateFoos$@592e7718, None, None, None, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true))) null else named_struct(ID, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true)).ID, RANK, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true)).RANK), input[0, scala.collection.mutable.ListBuffer, true], None) AS value#9, mapobjects(MapObjects_loopValue1, MapObjects_loopIsNull1, StructField(ID,LongType,false), StructField(RANK,IntegerType,false), if (isnull(lambdavariable(MapObjects_loopValue1, MapObjects_loopIsNull1, StructField(ID,LongType,false), StructField(RANK,IntegerType,false), true))) null else newInstance(class Foo), input[0, array<struct<ID:bigint,RANK:int>>, true], Some(class scala.collection.mutable.ListBuffer)), input[0, boolean, false] AS value#8, BooleanType, false, 0, 0) windowspecdefinition(ID#2L, RANK#3 ASC NULLS FIRST, unspecifiedframe$()) AS agg#14]
+- AnalysisBarrier
+- LocalRelation [ID#2L, RANK#3]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3296)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
... 49 elided
Насколько я могу судить, это какая-то внутренняя ошибка Spark, я в недоумении, любая помощь приветствуется.