Как использовать оконную функцию для подсчёта дней недели в Pyspark 2.1

С помощью приведенного ниже набора данных pyspark (2.1), как использовать оконную функцию, которая будет подсчитывать, сколько раз день недели текущей записи появлялся в течение последних 28 дней.

Пример кадра данных:

from pyspark.sql import functions as F
df = sqlContext.createDataFrame([
    ("a", "1", "2018-01-01 12:01:01","Monday"),
        ("a", "13", "2018-01-01 14:01:01","Monday"),
        ("a", "22", "2018-01-02 22:01:01","Tuesday"),
        ("a", "43", "2018-01-08 01:01:01","Monday"),
        ("a", "43", "2018-01-09 01:01:01","Tuesday"),
        ("a", "74", "2018-01-10 12:01:01","Wednesday"),
        ("a", "95", "2018-01-15 06:01:01","Monday"),
], ["person_id", "other_id", "timestamp","dow"])


df.withColumn("dow_count",`some window function`)

Возможное окно

from pyspark.sql import Window
from pyspark.sql import functions as F
Days_28 = (86400 * 28)
window= Window.partitionBy("person_id").orderBy('timestamp').rangeBetween(-Days_30, -1)
## I know this next line is wrong
df.withColumn("dow_count",F.sum(F.when(Current_day=windowed_day,1).otherwise(0)).over(window))

Пример вывода

df.show()

+---------+--------+-------------------+---------+---------+
|person_id|other_id|          timestamp|      dow|dow_count|
+---------+--------+-------------------+---------+---------+
|        a|       1|2018-01-01 12:01:01|   Monday|0        |
|        a|      13|2018-01-01 14:01:01|   Monday|1        |
|        a|      22|2018-01-02 22:01:01|  Tuesday|0        |
|        a|      43|2018-01-08 01:01:01|   Monday|2        |
|        a|      43|2018-01-09 01:01:01|  Tuesday|1        |
|        a|      74|2018-01-10 12:01:01|Wednesday|0        |
|        a|      95|2018-01-15 06:01:01|   Monday|3        |
+---------+--------+-------------------+---------+---------+

2 ответа

Используйте F.row_number(), окно, разделенное на (person_id, dow) и логику с вашим rangeBetween() следует заменить на where():

from datetime import timedelta, datetime

N_days = 28
end = datetime.combine(datetime.today(), datetime.min.time())
start = end - timedelta(days=N_days)

window = Window.partitionBy("person_id", "dow").orderBy('timestamp')

df.where((df.timestamp < end) & (df.timestamp >= start)) \
  .withColumn('dow_count', F.row_number().over(window)-1) \
  .show()

Я понял это и решил поделиться.

Сначала создайте метку времени Unix и приведите ее к long. Затем разделите по человеку и дню недели. Наконец, используйте функцию подсчета над окном.

from pyspark.sql import functions as F
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))

w = Window.partitionBy('person_id','dow').orderBy('unix_ts').rangeBetween(-86400*15,-1)
df = df.withColumn('occurrences_in_7_days',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show()

Бонус: Как создать фактический день недели из отметки времени.

df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))

Я не смог бы сделать это без советов от jxc и этой статьи stackru.

Другие вопросы по тегам