UDF в Spark SQL DSL

Я пытаюсь использовать DSL поверх чистого SQL в заданиях Spark SQL, но у меня не получается работать с UDF.

sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6))

Это не работает

rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))")

Я также хотел бы добавить еще одно условие соединения, как в этом работающем чистом SQL

val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)")

Спасибо за вашу помощь

1 ответ

Решение

Выражение SQL, которое вы передаете where Метод неверен по крайней мере по нескольким причинам:

  • === это Column метод не является допустимым равенством SQL. Вы должны использовать один знак равенства =
  • обозначение в скобках (table(column)) не является допустимым способом ссылки на столбцы в SQL. В этом контексте он будет распознан как вызов функции. SQL использует точечную нотацию (table.column)
  • даже если это не было ни rdd1 ни rdd2 действительны псевдонимы таблиц

Поскольку имена столбцов выглядят однозначно, вы можете просто использовать следующий код:

df1.join(df2).where("subdate(date_time) = subdate(dateTime)")

Если бы это было не так, использование точечного синтаксиса не сработало бы без предоставления псевдонимов. См., Например, Использование метода Data Data Frame "as".

Более того, регистрация UDF имеет смысл, главным образом, когда вы все время используете сырой SQL. Если вы хотите использовать DataFrame API лучше использовать UDF напрямую:

import org.apache.spark.sql.functions.udf

val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6)) 

val df1 = rdd1.toDF
val df2 = rdd2.toDF

df1.join(df2, subdate($"date_time") === subdate($"dateTime"))

или если имена столбцов были неоднозначными:

df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time")))

Наконец, для простых функций, подобных этой, лучше составлять встроенные выражения, чем создавать пользовательские функции.

Другие вопросы по тегам