Функции Spark Window - диапазон между датами

У меня Spark SQL DataFrame с данными, и я пытаюсь получить все строки, предшествующие текущей строке в заданном диапазоне дат. Так, например, я хочу, чтобы все строки из 7 дней назад предшествовали данной строке. Я понял, что мне нужно использовать Window Function лайк:

Window \
    .partitionBy('id') \
    .orderBy('start')

и тут возникает проблема. Я хочу иметь rangeBetween 7 дней, но в документации по Spark ничего нет. Spark даже предоставляет такую ​​возможность? Сейчас я просто получаю все предыдущие строки с помощью:

.rowsBetween(-sys.maxsize, 0)

но хотел бы добиться чего-то вроде:

.rangeBetween("7 days", 0)

Если кто-нибудь может помочь мне в этом, я буду очень благодарен. Заранее спасибо!

3 ответа

Решение

Spark >= 2,3

Начиная с Spark 2.3 можно использовать интервальные объекты с использованием SQL API, но DataFrame Поддержка API еще не завершена.

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Искра < 2.3

Насколько я знаю, это невозможно напрямую ни в Spark, ни в Hive. Оба требуют ORDER BY пункт используется с RANGE быть числовым Самая близкая вещь, которую я нашел, - преобразование в метку времени и работа на секундах. Если предположить, start столбец содержит date тип:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

Маленький помощник и определение окна:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

Наконец запрос:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Далеко не красиво, но работает.


* Hive Language Manual, Типы

Spark 3.3 выпущен, но...

Ответ может быть таким же старым, как Spark 1.5.0: datediff.

datediff(col_name, '1000')вернет целое число дней от 1000-01-01 до col_name.

В качестве первого аргумента он принимает даты, метки времени и даже строки.
В качестве второго он даже принимает1000.


Ответ

Разница дат в днях - в зависимости от типа данных столбца заказа :

дата

  • Искра 3.1+

            .orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
    
  • Искра 2.1+

     

отметка времени

  • Искра 2.1+

     

long - время UNIX в микросекундах (например, 1672534861000000)

  • Искра 2.1+

            .orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
    

long - время UNIX в миллисекундах (например, 1672534861000)

  • Искра 2.1+

            .orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
    

long - время UNIX в секундах (например, 1672534861)

  • Искра 2.1+

            .orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
    

длинный в формате ггггММдд

  • Искра 3.3+

            .orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Искра 3.1+

            .orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Искра 2.2+

            .orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
    
  • Искра 2.1+

            .orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
    

строка в формате даты «гггг-мм-дд»

  • Искра 3.1+

            .orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
    
  • Искра 2.1+

     

строка в другом формате даты (например, «ММ-дд-гггг»)

  • Искра 3.1+

            .orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
    
  • Искра 2.2+

            .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
    
  • Искра 2.1+

            .orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
    

строка в формате метки времени «гггг-ММ-дд ЧЧ: мм: сс»

  • Искра 2.1+

            .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

строка в другом формате метки времени (например, «ММ-дд-гггг ЧЧ: мм: сс»)

  • Искра 2.2+

            .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)
    

Фантастическое решение @zero323, если вы хотите работать с минутами, а не днями, как я, и вам не нужно разбивать разделы с помощью идентификатора, поэтому вам нужно только изменить простую часть кода, как я показываю:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()
Другие вопросы по тегам