Искрайтесь, датафрейм отбрасывает дубликаты и держит первым
Вопрос: в пандах при отбрасывании дубликатов вы можете указать, какие столбцы оставить. Есть ли аналог в Spark Dataframes?
Панды:
df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')
У Spark dataframe (я использую Spark 1.6.0) нет опции keep
df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])
Представьте, что "schedule_datetime" и "flt_flightnumber" - это столбцы 6,17. Создавая ключи на основе значений этих столбцов, мы также можем дедуплицировать
def get_key(x):
return "{0}{1}".format(x[6],x[17])
df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))
но как указать, чтобы сохранить первый ряд и избавиться от других дубликатов? Как насчет последнего ряда?
9 ответов
Всем, кто говорит, что dropDuplicates сохраняет первое вхождение - это не совсем правильно.
dropDuplicates сохраняет "первое вхождение" - только если есть 1 раздел. Ниже приведены некоторые примеры.
Единственный способ получить эквивалент функциональности "keep: {'first', 'last',}", подобного отбрасыванию дубликатов в Pandas, - это использовать функцию Spark Window + rank + filter, например: Получите первые n в каждой группе DataFrame в pyspark
Это проверено в Spark 2.4.0 с использованием pyspark.
import pandas as pd
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-02-01
1 1 2018-02-01
2 2 2018-02-01
3 3 2018-02-01
4 4 2018-02-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
.orderBy('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-02-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
.orderBy('datestr')
.repartition('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-02-01|
| 1|2018-01-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr')
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr', ascending = False)
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
Использовать
window
а также
row_number
функции.
Порядок по возрастанию или убыванию, чтобы выбрать первый или последний.
from pyspark.sql import Window
from pyspark.sql import functions as f
window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())
Вы можете использовать окно с row_number:
import pandas as pd
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 0|2018-02-01| 2|
| 0|2018-03-01| 3|
| 1|2018-01-01| 1|
| 1|2018-02-01| 2|
| 1|2018-03-01| 3|
| 3|2018-01-01| 1|
| 3|2018-02-01| 2|
| 3|2018-03-01| 3|
| 2|2018-01-01| 1|
| 2|2018-02-01| 2|
| 2|2018-03-01| 3|
| 4|2018-01-01| 1|
| 4|2018-02-01| 2|
| 4|2018-03-01| 3|
+----+----------+--------+
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 1|2018-01-01| 1|
| 3|2018-01-01| 1|
| 2|2018-01-01| 1|
| 4|2018-01-01| 1|
+----+----------+--------+
Я сделал следующее:
dataframe.groupBy("uniqueColumn").min("time")
Это будет сгруппировано по заданному столбцу, и в той же группе выберите тот, у которого минимальное время (это сохранит первый и удалит другие)
Используйте метод dropDuplicates по умолчанию, он сохраняет первый случай
Я только что сделал что-то похожее на то, что вам нужно, используя pyspark drop_duplicates.
Ситуация такая. У меня есть 2 кадра данных (исходящие из 2 файлов), которые точно такие же, за исключением 2 столбцов file_date (дата файла, извлеченная из имени файла) и data_date (штамп даты строки). Досадно, что у меня есть строки с одинаковыми данными data_date (и все другие ячейки столбцов), но с разными file_date, поскольку они реплицируются в каждом новом файле с добавлением одной новой строки.
Мне нужно было захватить все строки из нового файла, а также одну строку, оставшуюся от предыдущего файла. Этой строки нет в новом файле. Остальные столбцы справа от data_date одинаковы в двух файлах для одного и того же data_date.
file_1_20190122 - df1
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-22|2019-01-16| <- One row we want to keep where file_date 22nd
| AGGH|2019-01-22|2019-01-17|
| AGGH|2019-01-22|2019-01-18|
| AGGH|2019-01-22|2019-01-19|
| AGGH|2019-01-22|2019-01-20|
| AGGH|2019-01-22|2019-01-21|
| AGGH|2019-01-22|2019-01-22|
file_2_20190123 - df2
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-23|2019-01-17| \/ ALL rows we want to keep where file_date 23rd
| AGGH|2019-01-23|2019-01-18|
| AGGH|2019-01-23|2019-01-19|
| AGGH|2019-01-23|2019-01-20|
| AGGH|2019-01-23|2019-01-21|
| AGGH|2019-01-23|2019-01-22|
| AGGH|2019-01-23|2019-01-23|
Это потребует от нас сортировки и объединения файлов df, а затем их дедупликации во всех столбцах, кроме одного. Позвольте мне провести вас через это.
union_df = df1.union(df2) \
.sort(['station_code', 'data_date'], ascending=[True, True])
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-22|2019-01-16| <- keep
| AGGH|2019-01-23|2019-01-17| <- keep
| AGGH|2019-01-22|2019-01-17| x- drop
| AGGH|2019-01-22|2019-01-18| x- drop
| AGGH|2019-01-23|2019-01-18| <- keep
| AGGH|2019-01-23|2019-01-19| <- keep
| AGGH|2019-01-22|2019-01-19| x- drop
| AGGH|2019-01-23|2019-01-20| <- keep
| AGGH|2019-01-22|2019-01-20| x- drop
| AGGH|2019-01-22|2019-01-21| x- drop
| AGGH|2019-01-23|2019-01-21| <- keep
| AGGH|2019-01-23|2019-01-22| <- keep
| AGGH|2019-01-22|2019-01-22| x- drop
| AGGH|2019-01-23|2019-01-23| <- keep
Здесь мы отбрасываем уже отсортированные дублированные строки, исключая ключи ['file_date', 'data_date'].
nonduped_union_df = union_df \
.drop_duplicates(['station_code', 'data_date', 'time_zone',
'latitude', 'longitude', 'elevation',
'highest_temperature', 'lowest_temperature',
'highest_temperature_10_year_normal',
'another_50_columns'])
И результат содержит ОДНУ строку с самой ранней датой из DF1, которой нет в DF2, и ВСЕ строки из DF2.
nonduped_union_df.select(['station_code', 'file_date', 'data_date',
'highest_temperature', 'lowest_temperature']) \
.sort(['station_code', 'data_date'], ascending=[True, True]) \
.show(30)
+------------+----------+----------+-------------------+------------------+
|station_code| file_date| data_date|highest_temperature|lowest_temperature|
+------------+----------+----------+-------------------+------------------+
| AGGH|2019-01-22|2019-01-16| 90| 77| <- df1 22nd
| AGGH|2019-01-23|2019-01-17| 90| 77| \/- df2 23rd
| AGGH|2019-01-23|2019-01-18| 91| 75|
| AGGH|2019-01-23|2019-01-19| 88| 77|
| AGGH|2019-01-23|2019-01-20| 88| 77|
| AGGH|2019-01-23|2019-01-21| 88| 77|
| AGGH|2019-01-23|2019-01-22| 90| 75|
| AGGH|2019-01-23|2019-01-23| 90| 75|
| CWCA|2019-01-22|2019-01-15| 23| -2|
| CWCA|2019-01-23|2019-01-16| 7| -8|
| CWCA|2019-01-23|2019-01-17| 28| -6|
| CWCA|2019-01-23|2019-01-18| 0| -13|
| CWCA|2019-01-23|2019-01-19| 25| -15|
| CWCA|2019-01-23|2019-01-20| -4| -18|
| CWCA|2019-01-23|2019-01-21| 27| -6|
| CWCA|2019-01-22|2019-01-22| 30| 17|
| CWCA|2019-01-23|2019-01-22| 30| 13|
| CWCO|2019-01-22|2019-01-15| 34| 29|
| CWCO|2019-01-23|2019-01-16| 33| 13|
| CWCO|2019-01-22|2019-01-16| 33| 13|
| CWCO|2019-01-22|2019-01-17| 23| 7|
| CWCO|2019-01-23|2019-01-17| 23| 7|
+------------+----------+----------+-------------------+------------------+
only showing top 30 rows
Возможно, это не лучший ответ для этого случая, но у меня он сработал.
Дай мне знать, если где-то застрял.
Кстати - если кто-нибудь может сказать мне, как выбрать все столбцы в df, кроме одного, не перечисляя их в списке - я буду очень благодарен.
С уважением, G
Я бы попробовал так:
Предполагая, что ваш data_df выглядит так, и мы хотим сохранить строки с наибольшим значением в col1 на datestr:
col1 datestr
0 2018-01-01
1 2018-01-01
2 2018-01-01
3 2018-01-01
4 2018-01-01
0 2018-02-01
1 2018-02-01
2 2018-02-01
3 2018-02-01
4 2018-02-01
0 2018-03-01
1 2018-03-01
2 2018-03-01
3 2018-03-01
4 2018-03-01
ты можешь сделать:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('datestr')
data_df = data_df.withColumn("max", F.max(F.col("col1"))\
.over(w))\
.where(F.col('max') == F.col('col1'))\
.drop("max")
это приводит к:
col1 datestr
4 2018-01-01
4 2018-02-01
4 2018-03-01
если наборы данных невелики, конвертируйте в фрейм данных pandas и удаляйте дубликаты, сохраняя последние или первые, а затем конвертируйте обратно.
Учитывая приведенную ниже таблицу:
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
| 0|2018-02-01|
| 1|2018-02-01|
| 2|2018-02-01|
| 3|2018-02-01|
| 4|2018-02-01|
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
Сделать это можно в два этапа:
Сгруппируйте по данной таблице на основе столбца col1 и выберите минимальную дату.
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
left Присоединяется к результирующей таблице с исходной таблицей в столбцах col1 и min_datestr.