Получение исключения при использовании предложения Dataframe where
Я пытаюсь выполнить пример операции на Datarame, где пункт.
Вот мои примерные данные таблицы:
address district
hyderabad 001
delhi 002
mumbai 003
Теперь мне нужно оценить адрес, макс (район) с помощью DataFrame.
Результат будет таким:
Мумбаи 003
Обходной путь:
это код, который я пробовал до сих пор..,
SparkConf conf = new SparkConf();
conf.set("spark.app.name", "max");
conf.set("spark.master", "local");
conf.set("spark.ui.port", "7077");
SparkContext ctx=new SparkContext(conf);
SQLContext sqlContext = new SQLContext(ctx);
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("/Users/hadoop/Downloads/SacramentocrimeJanuary2006.csv");
//df.registerTempTable("consumer");
//Row[] result = df.orderBy("cdatetime").select("cdatetime","address").collect();
//DataFrame a = df.select("address","district").agg(functions.count("district"),functions.col("address")).orderBy("address");
DataFrame b =df.select("address","district").where("district=max(district)");
b.show();
}
Вот мое исключение:
Cannot evaluate expression: (max(input[1, IntegerType]),mode=Complete,isDistinct=false)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.genCode(interfaces.scala:73)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:419)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:401)
at org.apache.spark.sql.catalyst.expressions.EqualTo.genCode(predicates.scala:379)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:42)
at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:33)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:632)
at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:242)
at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:71)
at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/09 10:50:57 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
2 ответа
Вы должны использовать "агрегацию" и "объединение" для решения вашей проблемы, например:
data.agg(max($"district").as("maxd")).as("d1").join(data.as("d2"), $"d1.maxd" === $"d2.district").select($"address",$"district").show()
данные - это ваш DataFrame. Это будет полезно для вас
Вы можете использовать функцию сортировки на фрейме данных и упорядочить ее по убыванию. Тогда просто используйте головную функцию, вы получите требуемый результат.
Вот пример кода.
import org.apache.spark.sql.functions._
val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("/user/userapp/sample.csv");
val a = df.sort(desc("district")).head
вот вывод.