Выберите последнюю запись метки времени после оконной операции для каждой группы данных с помощью Spark Scala
Я выполнил подсчет попыток (пользователь, приложение) за временной промежуток дня (86400). Я хочу извлечь строки с последней отметкой времени с помощью счетчика и удалить ненужные предыдущие счетчики. Убедитесь, что ваш ответ учитывает временное окно. Один пользователь с 1 устройством может делать несколько попыток в день или неделю, я хочу иметь возможность извлекать эти конкретные моменты с окончательным счетом в каждом конкретном окне.
Мой начальный набор данных выглядит так:
val df = sc.parallelize(Seq(
("user1", "iphone", "2017-12-22 10:06:18", "Success"),
("user1", "iphone", "2017-12-22 11:15:12", "failed"),
("user1", "iphone", "2017-12-22 12:06:18", "Success"),
("user1", "iphone", "2017-12-22 09:15:12", "failed"),
("user1", "iphone", "2017-12-20 10:06:18", "Success"),
("user1", "iphone", "2017-12-20 11:15:12", "failed"),
("user1", "iphone", "2017-12-20 12:06:18", "Success"),
("user1", "iphone", "2017-12-20 09:15:12", "failed"),
("user1", "android", "2017-12-20 09:25:20", "Success"),
("user1", "android", "2017-12-20 09:44:22", "Success"),
("user1", "android", "2017-12-20 09:58:22", "Success"),
("user1", "iphone", "2017-12-20 16:44:20", "Success"),
("user1", "iphone", "2017-12-20 16:44:25", "Success"),
("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")
Код, который я запустил и что я получил.
// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("date_time").cast("long").desc)
.rangeBetween(-86400, 0)
val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))
Теперь у меня есть
// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1| android| 2017-12-20 09:58:22|Success| 1|
| user1| android| 2017-12-20 09:44:22|Success| 2|
| user1| android| 2017-12-20 09:25:20|Success| 3|
| user1| iphone| 2017-12-22 12:06:18|Success| 1|
| user1| iphone| 2017-12-22 11:15:12| failed| 2|
| user1| iphone| 2017-12-22 10:06:18|Success| 3|
| user1| iphone| 2017-12-22 09:15:12| failed| 4|
| user1| iphone| 2017-12-20 16:44:35|Success| 1|
| user1| iphone| 2017-12-20 16:44:25|Success| 2|
| user1| iphone| 2017-12-20 16:44:20|Success| 3|
| user1| iphone| 2017-12-20 12:06:18|Success| 4|
| user1| iphone| 2017-12-20 11:15:12| failed| 5|
| user1| iphone| 2017-12-20 10:06:18|Success| 6|
| user1| iphone| 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+
Что я хочу Поэтому я хочу получить последнюю метку времени вместе с ее счетом, убедившись, что я в одном и том же временном окне.
+--------+--------------+---------------------+-------+--------+
|username|. device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
| user1 | android | 2017-12-20 09:25:20|Success| 3|
| user1 | iphone | 2017-12-22 09:15:12| failed| 4|
| user1 | iphone | 2017-12-20 09:15:12| failed| 7|
+--------+--------------+---------------------+-------+--------+**
1 ответ
Вы почти там. Вы вычислили счет, посмотрев на дневной диапазон. Теперь все, что вам нужно сделать, это выяснить последнюю запись в этом однодневном диапазоне, что можно сделать, используя функцию last для той же самой оконной функции, но с обратным диапазоном.
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
def day(x: Int) = x * 86400
val w1 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
.orderBy(col("date_time").cast("timestamp").cast("long").desc)
.rangeBetween(0, day(1))
val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
.withColumn("att", last("attempts").over(w2))
.filter(col("attempts") === col("att"))
.drop("att")
который должен дать вам
+--------+--------------+---------------------+-------+--------+
|username| device| date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1 |android |2017-12-20 09:25:20 |Success|3 |
|user1 |iphone |2017-12-22 09:15:12 | Failed|4 |
|user1 |iphone |2017-12-20 09:15:12 | Failed|7 |
+--------+--------------+---------------------+-------+--------+
так же, как указано в комментариях ниже
В 1 дне 86400 секунд. Я хотел оглянуться назад на 1 день. Точно так же 3600 секунд это 1 час. И 604 800 секунд за 1 неделю
Вы можете изменить функцию дня на часы и недели, как показано ниже, и использовать их в окне. rangeBetween
def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800
Я надеюсь, что ответ полезен