UDF в Spark SQL DSL
Я пытаюсь использовать DSL поверх чистого SQL в заданиях Spark SQL, но у меня не получается работать с UDF.
sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))
Это не работает
rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")
Я также хотел бы добавить еще одно условие соединения, как в этом работающем чистом SQL
val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")
Спасибо за вашу помощь
1 ответ
Выражение SQL, которое вы передаете where
Метод неверен по крайней мере по нескольким причинам:
===
этоColumn
метод не является допустимым равенством SQL. Вы должны использовать один знак равенства=
- обозначение в скобках (
table(column)
) не является допустимым способом ссылки на столбцы в SQL. В этом контексте он будет распознан как вызов функции. SQL использует точечную нотацию (table.column
) - даже если это не было ни
rdd1
ниrdd2
действительны псевдонимы таблиц
Поскольку имена столбцов выглядят однозначно, вы можете просто использовать следующий код:
df1.join(df2).where("subdate(date_time) = subdate(dateTime)")
Если бы это было не так, использование точечного синтаксиса не сработало бы без предоставления псевдонимов. См., Например, Использование метода Data Data Frame "as".
Более того, регистрация UDF имеет смысл, главным образом, когда вы все время используете сырой SQL. Если вы хотите использовать DataFrame
API лучше использовать UDF напрямую:
import org.apache.spark.sql.functions.udf
val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6))
val df1 = rdd1.toDF
val df2 = rdd2.toDF
df1.join(df2, subdate($"date_time") === subdate($"dateTime"))
или если имена столбцов были неоднозначными:
df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time")))
Наконец, для простых функций, подобных этой, лучше составлять встроенные выражения, чем создавать пользовательские функции.