Как использовать аналитические / оконные функции в Spark Java?
Я пытаюсь использовать аналитическую / оконную функцию last_value в Spark Java.
Netezza Query:
select sno, name, addr1, addr2, run_dt,
last_value(addr1 ignore nulls) over (partition by sno, name, addr1, addr2, run_dt order by beg_ts , end_ts rows between unbounded preceding and unbounded following ) as last_addr1
from daily
Мы хотим реализовать этот запрос в Spark Java (без использования HiveSQLContext):
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.WindowFunctionFrame;
SparkConf conf = new SparkConf().setMaster("local").setAppName("Agg");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<Stgdailydtl> daily = sc.textFile("C:\\Testing.txt").map(
new Function<String, Stgdailydtl>() {
private static final long serialVersionUID = 1L;
public Stgdailydtl call(String line) throws Exception {
String[] parts = line.split(",");
Stgdailydtl daily = new Stgdailydtl();
daily.setSno(Integer.parseInt(parts[0].trim()));
.....
return daily;
}
});
DataFrame schemaDailydtl = sqlContext.createDataFrame(daily, Stgdailydtl.class);
schemaDailydtl.registerTempTable("daily");
WindowSpec ws = Window.partitionBy("sno, name, addr1, addr2, run_dt").orderBy("beg_ts , end_ts").rowsBetween(0, 100000);
DataFrame df = sqlContext.sql("select sno, name, addr1, addr2, run_dt "
+ "row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from daily ");
}
}
Ошибка:
Exception in thread "main" java.lang.RuntimeException: [1.110] failure: ``union'' expected but `(' found
select stg.mach_id, stg.msrmt_gbl_id, stg.msrmt_dsc, stg.elmt_dsc, stg.elmt_dsc_grp_concat, row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from stgdailydtl stg
^
at scala.sys.package$.error(package.scala:27)
Я не мог понять, как использовать WindowSpec/Window объект. Пожалуйста, предложите по этому вопросу. Спасибо за вашу помощь
1 ответ
Вы смешиваете синтаксис кадра данных и синтаксиса sql - в частности, вы создали WindowSpec, но затем не использовали его.
Импортировать org.apache.spark.sql.functions
чтобы получить row_number
затем создайте столбец, который вы пытаетесь выбрать:
Column rowNum = functions.row_number().over(ws)
Затем выберите его с помощью API dataframe:
df.select(each, column, you, want, rowNum)
Мой синтаксис может быть немного не так, я привык к Scala или Python, но суть что-то в этом роде.