Spark: рассчитывает на окно, не работающее в течение миллисекунды
Вы можете создать окно для подсчета количества повторений записи за последние 7 дней. Однако, если вы попытаетесь посмотреть, сколько раз запись происходила на миллисекундном уровне, она ломается.
Короче говоря, ниже функция df.timestamp.astype('Timestamp').cast("long")
преобразует только временную метку до длительности секунды. Это игнорирует миллисекунду. Как превратить всю метку времени, включая миллисекунды, в длинную. Вам нужно, чтобы значение было длинным, чтобы оно работало с окном.
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
df = sqlContext.createDataFrame([
("a", "u8u", "2018-02-02 05:46:41.438357"),
("a", "u8u", "2018-02-02 05:46:41.439377"),
("a", "a3a", "2018-02-02 09:48:34.081818"),
("a", "a3a", "2018-02-02 09:48:34.095586"),
("a", "g8g", "2018-02-02 09:48:56.006206"),
("a", "g8g", "2018-02-02 09:48:56.007974"),
("a", "9k9", "2018-02-02 12:50:48.000000"),
("a", "9k9", "2018-02-02 12:50:48.100000"),
], ["person_id", "session_id", "timestamp"])
df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
+---------+----------+--------------------------+----------+---------+-----+
|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
+---------+----------+--------------------------+----------+---------+-----+
|a |u8u |2018-02-02 05:46:41.438357|1517572001|Friday |0 |
|a |u8u |2018-02-02 05:46:41.439377|1517572001|Friday |0 |
|a |a3a |2018-02-02 09:48:34.081818|1517586514|Friday |2 |
|a |a3a |2018-02-02 09:48:34.095586|1517586514|Friday |2 |
|a |g8g |2018-02-02 09:48:56.006206|1517586536|Friday |4 |
|a |g8g |2018-02-02 09:48:56.007974|1517586536|Friday |4 |
|a |9k9 |2018-02-02 12:50:48.000000|1517597448|Friday |6 |
|a |9k9 |2018-02-02 12:50:48.100000|1517597448|Friday |6 |
+---------+----------+--------------------------+----------+---------+-----+
Количество должно быть 0,1,2,3,4,5... вместо 0,0,2,2,4,4,...
1 ответ
Ты можешь использовать pyspark.sql.functions.unix_timestamp()
преобразовать строковый столбец во временную метку вместо приведения к long
,
import pyspark.sql.functions as F
df.select(
"timestamp",
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("unix_ts")
).show(truncate=False)
#+--------------------------+----------+
#|timestamp |unix_ts |
#+--------------------------+----------+
#|2018-02-02 05:46:41.438357|1517568839|
#|2018-02-02 05:46:41.439377|1517568840|
#|2018-02-02 09:48:34.081818|1517582995|
#|2018-02-02 09:48:34.095586|1517583009|
#|2018-02-02 09:48:56.006206|1517582942|
#|2018-02-02 09:48:56.007974|1517582943|
#|2018-02-02 12:50:48.862644|1517594710|
#|2018-02-02 12:50:49.981848|1517594830|
#+--------------------------+----------+
Второй аргумент unix_timestamp()
это строка формата В вашем случае используйте "yyyy-MM-dd HH:mm:ss.SSSSSS"
,
Соответствующее изменение, примененное к вашему коду:
df = df.withColumn(
'unix_ts',
F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS")
)
df = df.withColumn("DayOfWeek", F.date_format(df.timestamp, 'EEEE'))
w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)
#+---------+----------+--------------------------+----------+---------+-----+
#|person_id|session_id|timestamp |unix_ts |DayOfWeek|count|
#+---------+----------+--------------------------+----------+---------+-----+
#|a |u8u |2018-02-02 05:46:41.438357|1517568839|Friday |0 |
#|a |u8u |2018-02-02 05:46:41.439377|1517568840|Friday |1 |
#|a |g8g |2018-02-02 09:48:56.006206|1517582942|Friday |2 |
#|a |g8g |2018-02-02 09:48:56.007974|1517582943|Friday |3 |
#|a |a3a |2018-02-02 09:48:34.081818|1517582995|Friday |4 |
#|a |a3a |2018-02-02 09:48:34.095586|1517583009|Friday |5 |
#|a |9k9 |2018-02-02 12:50:48.862644|1517594710|Friday |6 |
#|a |9k9 |2018-02-02 12:50:49.981848|1517594830|Friday |7 |
#+---------+----------+--------------------------+----------+---------+-----+