Получение исключения при использовании предложения Dataframe where

Я пытаюсь выполнить пример операции на Datarame, где пункт.

Вот мои примерные данные таблицы:

address       district
hyderabad      001
delhi          002
mumbai         003

Теперь мне нужно оценить адрес, макс (район) с помощью DataFrame.

Результат будет таким:

Мумбаи 003

Обходной путь:

это код, который я пробовал до сих пор..,

SparkConf conf = new SparkConf();
        conf.set("spark.app.name", "max");
        conf.set("spark.master", "local");
        conf.set("spark.ui.port", "7077");

        SparkContext  ctx=new SparkContext(conf);       
        SQLContext sqlContext = new SQLContext(ctx);
        DataFrame df = sqlContext.read()
            .format("com.databricks.spark.csv")
            .option("inferSchema", "true")
            .option("header", "true")
            .load("/Users/hadoop/Downloads/SacramentocrimeJanuary2006.csv");
        //df.registerTempTable("consumer");
        //Row[] result = df.orderBy("cdatetime").select("cdatetime","address").collect();
        //DataFrame a = df.select("address","district").agg(functions.count("district"),functions.col("address")).orderBy("address");
        DataFrame b =df.select("address","district").where("district=max(district)");
        b.show();
        }

Вот мое исключение:

Cannot evaluate expression: (max(input[1, IntegerType]),mode=Complete,isDistinct=false)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.genCode(Expression.scala:233)
    at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.genCode(interfaces.scala:73)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:419)
    at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:401)
    at org.apache.spark.sql.catalyst.expressions.EqualTo.genCode(predicates.scala:379)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:106)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$gen$2.apply(Expression.scala:102)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.gen(Expression.scala:102)
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:42)
    at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:33)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:635)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:632)
    at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:242)
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:71)
    at org.apache.spark.sql.execution.Filter$$anonfun$2.apply(basicOperators.scala:70)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/12/09 10:50:57 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)

2 ответа

Вы должны использовать "агрегацию" и "объединение" для решения вашей проблемы, например:

data.agg(max($"district").as("maxd")).as("d1").join(data.as("d2"), $"d1.maxd" === $"d2.district").select($"address",$"district").show()

данные - это ваш DataFrame. Это будет полезно для вас

Вы можете использовать функцию сортировки на фрейме данных и упорядочить ее по убыванию. Тогда просто используйте головную функцию, вы получите требуемый результат.

Вот пример кода.

import org.apache.spark.sql.functions._
val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("/user/userapp/sample.csv");
val a = df.sort(desc("district")).head

вот вывод.

введите описание изображения здесь

Другие вопросы по тегам