Pyspark - Как использовать функции lag и rangeBetween для значений меток времени?

У меня есть данные, которые выглядят так:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769

Я хотел бы добавить новый логический столбец, который отмечает true, если есть 2 или болееuserid в течение 5 минут окно в том же location_point, У меня была идея использовать lag функция для поиска по окну, разделенному userid и с диапазоном между текущей отметкой времени и следующими 5 минутами:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")

Этот код дает мне исключение анализа, когда я использую функцию RangeBetween:

AnalysisException: u'Window Frame RANGE МЕЖДУ ТЕКУЩЕЙ СТРОКОЙ И 1500 СЛЕДУЮЩИХ должны соответствовать требуемому кадру ROWS МЕЖДУ 1 PRECEDING И 1 PRECEDING;

Знаете ли вы, как решить эту проблему?

2 ответа

Решение

Учитывая ваши данные: давайте добавим столбец с отметкой времени в секундах:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()

+--------+-------------------+--------------+----------+
|  userid|          eventtime|location_point| timestamp|  
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|
+--------+-------------------+--------------+----------+  

Теперь давайте определим оконную функцию с разделением по location_point, порядком по временной отметке и диапазоном от -300 с до текущего времени. Мы можем посчитать количество элементов в этом окне и поместить эти данные в столбец с именем 'occurences in_5_min':

w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()

+--------+-------------------+--------------+----------+--------------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|
+--------+-------------------+--------------+----------+--------------------+

Теперь вы можете добавить желаемый столбец с True, если количество событий строго больше 1 за последние 5 минут в определенном месте:

add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()

+--------+-------------------+--------------+----------+--------------------+---------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|          false|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|          false|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|          false|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|          false|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|           true|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|          false|
+--------+-------------------+--------------+----------+--------------------+---------------+

rangeBetween просто не имеет смысла для неагрегированных функций, таких как lag, lag всегда принимает определенную строку, обозначаемую аргументом offset, поэтому указывать frame бессмысленно.

Чтобы получить окно во временные ряды, вы можете использовать window группировка со стандартными агрегатами:

from pyspark.sql.functions import window,  countDistinct


(df
    .groupBy("location_point", window("eventtime", "5 minutes"))
    .agg( countDistinct("userid")))

Вы можете добавить больше аргументов для изменения длительности слайда.

Вы можете попробовать что-то подобное с оконными функциями, если вы разделите location:

windowSpec = (W.partitionBy(col("location"))
  .orderBy(col("eventtime").cast("timestamp").cast("long"))
  .rangeBetween(0, days(5)))


df.withColumn("id_count", countDistinct("userid").over(windowSpec))
Другие вопросы по тегам