Рекурсивный cte в спарк SQL
; WITH Hierarchy as
(
select distinct PersonnelNumber
, Email
, ManagerEmail
from dimstage
union all
select e.PersonnelNumber
, e.Email
, e.ManagerEmail
from dimstage e
join Hierarchy as h on e.Email = h.ManagerEmail
)
select * from Hierarchy
Можете ли вы помочь достичь того же в SPARK SQL
4 ответа
Это довольно поздно, но сегодня я попытался реализовать рекурсивный запрос cte с помощью PySpark SQL.
Здесь у меня есть простой фрейм данных. Я хочу найти НОВЕЙШИЙ идентификатор каждого идентификатора.
Исходный фрейм данных:
+-----+-----+
|OldID|NewID|
+-----+-----+
| 1| 2|
| 2| 3|
| 3| 4|
| 4| 5|
| 6| 7|
| 7| 8|
| 9| 10|
+-----+-----+
Результат хочу:
+-----+-----+
|OldID|NewID|
+-----+-----+
| 1| 5|
| 2| 5|
| 3| 5|
| 4| 5|
| 6| 8|
| 7| 8|
| 9| 10|
+-----+-----+
Вот мой код:
df = sqlContext.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer").checkpoint().cache()
dfcheck = df.drop('NewID')
dfdistinctID = df.select('NewID').distinct()
dfidfinal = dfdistinctID.join(dfcheck, [dfcheck.OldID == dfdistinctID.NewID], how="left_anti") #We find the IDs that have not been replaced
dfcurrent = df.join(dfidfinal, [dfidfinal.NewID == df.NewID], how="left_semi").checkpoint().cache() #We find the the rows that are related to the IDs that have not been replaced, then assign them to the dfcurrent dataframe.
dfresult = dfcurrent
dfdifferentalias = df.select(df.OldID.alias('id1'), df.NewID.alias('id2')).checkpoint().cache()
while dfcurrent.count() > 0:
dfcurrent = dfcurrent.join(broadcast(dfdifferentalias), [dfcurrent.OldID == dfdifferentalias.id2], how="inner").select(dfdifferentalias.id1.alias('OldID'), dfcurrent.NewID.alias('NewID')).cache()
dfresult = dfresult.unionAll(dfcurrent)
display(dfresult.orderBy('OldID'))
Скриншот записной книжки Databricks
Я знаю, что производительность довольно плохая, но, по крайней мере, она дает нужный мне ответ.
Я впервые отправляю ответ на StackOverFlow, так что простите меня, если я допустил ошибку.
Это невозможно при использовании SPARK SQL. Предложение WITH существует, но не для CONNECT BY, как, например, в ORACLE или рекурсии в DB2.
Документация Spark предоставляет «CTE в определении CTE». Это воспроизводится ниже:
-- CTE in CTE definition
WITH t AS (
WITH t2 AS (SELECT 1)
SELECT * FROM t2
)
SELECT * FROM t;
+---+
| 1|
+---+
| 1|
+---+
Вы можете расширить это до нескольких вложенных запросов, но синтаксис может быстро стать неудобным. Мое предложение состоит в том, чтобы использовать комментарии, чтобы было понятно, откуда берется следующий оператор select. По сути, начните с первого запроса и поместите дополнительные операторы CTE выше и ниже по мере необходимости:
WITH t3 AS (
WITH t2 AS (
WITH t1 AS (SELECT distinct b.col1
FROM data_a as a, data_b as b
WHERE a.col2 = b.col2
AND a.col3 = b.col3
-- select from t1
)
SELECT distinct b.col1, b.col2, b.col3
FROM t1 as a, data_b as b
WHERE a.col1 = b.col1
-- select from t2
)
SELECT distinct b.col1
FROM t2 as a, data_b as b
WHERE a.col2 = b.col2
AND a.col3 = b.col3
-- select from t3
)
SELECT distinct b.col1, b.col2, b.col3
FROM t3 as a, data_b as b
WHERE a.col1 = b.col1;
Вы можете рекурсивно использоватьcreateOrReplaceTempView
построить рекурсивный запрос. Это не будет быстро и красиво, но это работает. Следуя примеру @Pblade, PySpark:
def recursively_resolve(df):
rec = df.withColumn('level', F.lit(0))
sql = """
select this.oldid
, coalesce(next.newid, this.newid) as newid
, this.level + case when next.newid is not null then 1 else 0 end as level
, next.newid is not null as is_resolved
from rec this
left outer
join rec next
on next.oldid = this.newid
"""
find_next = True
while find_next:
rec.createOrReplaceTempView("rec")
rec = spark.sql(sql)
# check if any rows resolved in this iteration
# go deeper if they did
find_next = rec.selectExpr("ANY(is_resolved = True)").collect()[0][0]
return rec.drop('is_resolved')
Затем:
src = spark.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer")
result = recursively_resolve(src)
result.show()
Отпечатки:
+-----+-----+-----+
|oldid|newid|level|
+-----+-----+-----+
| 2| 5| 2|
| 4| 5| 0|
| 3| 5| 1|
| 7| 8| 0|
| 6| 8| 1|
| 9| 10| 0|
| 1| 5| 2|
+-----+-----+-----+