SparkSQL на pyspark: как генерировать временные ряды?
Я использую SparkSQL на pyspark для хранения некоторых таблиц PostgreSQL в DataFrames, а затем строю запрос, который генерирует несколько временных рядов на основе start
а также stop
столбцы типа date
,
Предположим, что my_table
содержит:
start | stop
-------------------------
2000-01-01 | 2000-01-05
2012-03-20 | 2012-03-23
В PostgreSQL это очень легко сделать:
SELECT generate_series(start, stop, '1 day'::interval)::date AS dt FROM my_table
и он сгенерирует эту таблицу:
dt
------------
2000-01-01
2000-01-02
2000-01-03
2000-01-04
2000-01-05
2012-03-20
2012-03-21
2012-03-22
2012-03-23
но как это сделать с помощью простого SparkSQL? Будет ли необходимо использовать UDF или некоторые методы DataFrame?
2 ответа
Предположим, у вас есть датафрейм df
от spark sql, попробуйте это
from pyspark.sql.functions as F
from pyspark.sql.types as T
def timeseriesDF(start, total):
series = [start]
for i xrange( total-1 ):
series.append(
F.date_add(series[-1], 1)
)
return series
df.withColumn("t_series", F.udf(
timeseriesDF,
T.ArrayType()
) ( df.start, F.datediff( df.start, df.stop ) )
).select(F.explode("t_series")).show()
EDIT
This creates a dataframe with one row containing an array of consecutive dates:
from pyspark.sql.functions import sequence, to_date, explode, col
spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month) as date")
+------------------------------------------+
| date |
+------------------------------------------+
| ["2018-01-01","2018-02-01","2018-03-01"] |
+------------------------------------------+
You can use the explode function to "pivot" this array into rows:
spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month) as date").withColumn("date", explode(col("date"))
+------------+
| date |
+------------+
| 2018-01-01 |
| 2018-02-01 |
| 2018-03-01 |
+------------+
(End of edit)
Spark v2.4 support sequence
function:
sequence(start, stop, step) - Generates an array of elements from start to stop (inclusive), incrementing by step. The type of the returned elements is the same as the type of argument expressions.
Supported types are: byte, short, integer, long, date, timestamp.
Examples:
SELECT sequence(1, 5);
[1,2,3,4,5]
SELECT sequence(5, 1);
[5,4,3,2,1]
SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month);
[2018-01-01,2018-02-01,2018-03-01]
https://docs.databricks.com/spark/latest/spark-sql/language-manual/functions.html
Существующие ответы будут работать, но очень неэффективны. Вместо этого лучше использовать range
а затем приведите данные. В питоне
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
def generate_series(start, stop, interval):
"""
:param start - lower bound, inclusive
:param stop - upper bound, exclusive
:interval int - increment interval in seconds
"""
spark = SparkSession.builder.getOrCreate()
# Determine start and stops in epoch seconds
start, stop = spark.createDataFrame(
[(start, stop)], ("start", "stop")
).select(
[col(c).cast("timestamp").cast("long") for c in ("start", "stop")
]).first()
# Create range with increments and cast to timestamp
return spark.range(start, stop, interval).select(
col("id").cast("timestamp").alias("value")
)
Пример использования:
generate_series("2000-01-01", "2000-01-05", 60 * 60).show(5) # By hour
+-------------------+
| value|
+-------------------+
|2000-01-01 00:00:00|
|2000-01-01 01:00:00|
|2000-01-01 02:00:00|
|2000-01-01 03:00:00|
|2000-01-01 04:00:00|
+-------------------+
only showing top 5 rows
generate_series("2000-01-01", "2000-01-05", 60 * 60 * 24).show() # By day
+-------------------+
| value|
+-------------------+
|2000-01-01 00:00:00|
|2000-01-02 00:00:00|
|2000-01-03 00:00:00|
|2000-01-04 00:00:00|
+-------------------+
Ответ @Rakesh правильный, но я хотел бы поделиться менее подробным решением:
import datetime
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction
# UDF
def generate_date_series(start, stop):
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
# Register UDF for later usage
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()) )
# mydf is a DataFrame with columns `start` and `stop` of type DateType()
mydf.createOrReplaceTempView("mydf")
spark.sql("SELECT explode(generate_date_series(start, stop)) FROM mydf").show()
Основываясь на ответе user10938362, просто демонстрируя способ использования диапазона без UDF, при условии, что вы пытаетесь создать фрейм данных с датами на основе некоторого принятого набора данных, а не с жестко запрограммированным запуском / остановом.
# start date is min date
date_min=int(df.agg({'date': 'min'}).first()[0])
# end date is current date or alternatively could use max as above
date_max=(
spark.sql('select unix_timestamp(current_timestamp()) as date_max')
.collect()[0]['date_max']
)
# range is int, unix time is s so 60*60*24=day
df=spark.range(date_min, date_max, 60*60*24).select('id')