Рекурсивный cte в спарк SQL

; WITH  Hierarchy as 
        (
            select distinct PersonnelNumber
            , Email
            , ManagerEmail 
            from dimstage
            union all
            select e.PersonnelNumber
            , e.Email           
            , e.ManagerEmail 
            from dimstage  e
            join Hierarchy as  h on e.Email = h.ManagerEmail
        )
        select * from Hierarchy

Можете ли вы помочь достичь того же в SPARK SQL

4 ответа

Это довольно поздно, но сегодня я попытался реализовать рекурсивный запрос cte с помощью PySpark SQL.

Здесь у меня есть простой фрейм данных. Я хочу найти НОВЕЙШИЙ идентификатор каждого идентификатора.

Исходный фрейм данных:

+-----+-----+
|OldID|NewID|
+-----+-----+
|    1|    2|
|    2|    3|
|    3|    4|
|    4|    5|
|    6|    7|
|    7|    8|
|    9|   10|
+-----+-----+

Результат хочу:

+-----+-----+
|OldID|NewID|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    6|    8|
|    7|    8|
|    9|   10|
+-----+-----+

Вот мой код:

df = sqlContext.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer").checkpoint().cache()

dfcheck = df.drop('NewID')
dfdistinctID = df.select('NewID').distinct()
dfidfinal = dfdistinctID.join(dfcheck, [dfcheck.OldID == dfdistinctID.NewID], how="left_anti") #We find the IDs that have not been replaced

dfcurrent = df.join(dfidfinal, [dfidfinal.NewID == df.NewID], how="left_semi").checkpoint().cache() #We find the the rows that are related to the IDs that have not been replaced, then assign them to the dfcurrent dataframe.
dfresult = dfcurrent
dfdifferentalias = df.select(df.OldID.alias('id1'), df.NewID.alias('id2')).checkpoint().cache()

while dfcurrent.count() > 0:
  dfcurrent = dfcurrent.join(broadcast(dfdifferentalias), [dfcurrent.OldID == dfdifferentalias.id2], how="inner").select(dfdifferentalias.id1.alias('OldID'), dfcurrent.NewID.alias('NewID')).cache()
  dfresult = dfresult.unionAll(dfcurrent)

display(dfresult.orderBy('OldID'))

Скриншот записной книжки Databricks

Я знаю, что производительность довольно плохая, но, по крайней мере, она дает нужный мне ответ.

Я впервые отправляю ответ на StackOverFlow, так что простите меня, если я допустил ошибку.

Это невозможно при использовании SPARK SQL. Предложение WITH существует, но не для CONNECT BY, как, например, в ORACLE или рекурсии в DB2.

Документация Spark предоставляет «CTE в определении CTE». Это воспроизводится ниже:

      -- CTE in CTE definition
WITH t AS (
    WITH t2 AS (SELECT 1)
    SELECT * FROM t2
)
SELECT * FROM t;
+---+
|  1|
+---+
|  1|
+---+

Вы можете расширить это до нескольких вложенных запросов, но синтаксис может быстро стать неудобным. Мое предложение состоит в том, чтобы использовать комментарии, чтобы было понятно, откуда берется следующий оператор select. По сути, начните с первого запроса и поместите дополнительные операторы CTE выше и ниже по мере необходимости:

      WITH t3 AS (
WITH t2 AS (
WITH t1 AS (SELECT distinct b.col1
            FROM data_a as a, data_b as b
            WHERE a.col2 = b.col2
            AND a.col3 = b.col3
-- select from t1
            )
            SELECT distinct b.col1, b.col2, b.col3
            FROM t1 as a, data_b as b
            WHERE a.col1 = b.col1
-- select from t2
            )
            SELECT distinct b.col1
            FROM t2 as a, data_b as b
            WHERE a.col2 = b.col2
            AND a.col3 = b.col3
-- select from t3
            )
            SELECT distinct b.col1, b.col2, b.col3
            FROM t3 as a, data_b as b
            WHERE a.col1 = b.col1;

Вы можете рекурсивно использоватьcreateOrReplaceTempViewпостроить рекурсивный запрос. Это не будет быстро и красиво, но это работает. Следуя примеру @Pblade, PySpark:

      def recursively_resolve(df):
    rec = df.withColumn('level', F.lit(0))
    
    sql = """
        select this.oldid
             , coalesce(next.newid, this.newid) as newid
             , this.level + case when next.newid is not null then 1 else 0 end as level
             , next.newid is not null as is_resolved
          from rec this
          left outer
          join rec next
            on next.oldid = this.newid
    """
    find_next = True
    while find_next:
        rec.createOrReplaceTempView("rec")
        rec = spark.sql(sql)
        # check if any rows resolved in this iteration
        # go deeper if they did
        find_next = rec.selectExpr("ANY(is_resolved = True)").collect()[0][0]
        
    return rec.drop('is_resolved')
        

Затем:

      src = spark.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer")
result = recursively_resolve(src)
result.show()

Отпечатки:

      +-----+-----+-----+
|oldid|newid|level|
+-----+-----+-----+
|    2|    5|    2|
|    4|    5|    0|
|    3|    5|    1|
|    7|    8|    0|
|    6|    8|    1|
|    9|   10|    0|
|    1|    5|    2|
+-----+-----+-----+
Другие вопросы по тегам