Искрайтесь, датафрейм отбрасывает дубликаты и держит первым

Вопрос: в пандах при отбрасывании дубликатов вы можете указать, какие столбцы оставить. Есть ли аналог в Spark Dataframes?

Панды:

df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')

У Spark dataframe (я использую Spark 1.6.0) нет опции keep

df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])

Представьте, что "schedule_datetime" и "flt_flightnumber" - это столбцы 6,17. Создавая ключи на основе значений этих столбцов, мы также можем дедуплицировать

def get_key(x):
    return "{0}{1}".format(x[6],x[17])

df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))

но как указать, чтобы сохранить первый ряд и избавиться от других дубликатов? Как насчет последнего ряда?

9 ответов

Всем, кто говорит, что dropDuplicates сохраняет первое вхождение - это не совсем правильно.

dropDuplicates сохраняет "первое вхождение" - только если есть 1 раздел. Ниже приведены некоторые примеры.

Единственный способ получить эквивалент функциональности "keep: {'first', 'last',}", подобного отбрасыванию дубликатов в Pandas, - это использовать функцию Spark Window + rank + filter, например: Получите первые n в каждой группе DataFrame в pyspark

Это проверено в Spark 2.4.0 с использованием pyspark.

import pandas as pd

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-02-01
1     1  2018-02-01
2     2  2018-02-01
3     3  2018-02-01
4     4  2018-02-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
# first example
# does not give first (based on datestr)

(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-02-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition

(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .repartition('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-02-01|
|   1|2018-01-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)

(spark
   .createDataFrame(dfall)
   .orderBy('datestr')
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)

(spark
   .createDataFrame(dfall)
   .orderBy('datestr', ascending = False)
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+

Использовать windowа также row_numberфункции.
Порядок по возрастанию или убыванию, чтобы выбрать первый или последний.

      from pyspark.sql import Window
from pyspark.sql import functions as f

window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())

Вы можете использовать окно с row_number:

import pandas as pd
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()

+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   0|2018-02-01|       2|
|   0|2018-03-01|       3|
|   1|2018-01-01|       1|
|   1|2018-02-01|       2|
|   1|2018-03-01|       3|
|   3|2018-01-01|       1|
|   3|2018-02-01|       2|
|   3|2018-03-01|       3|
|   2|2018-01-01|       1|
|   2|2018-02-01|       2|
|   2|2018-03-01|       3|
|   4|2018-01-01|       1|
|   4|2018-02-01|       2|
|   4|2018-03-01|       3|
+----+----------+--------+
+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   1|2018-01-01|       1|
|   3|2018-01-01|       1|
|   2|2018-01-01|       1|
|   4|2018-01-01|       1|
+----+----------+--------+

Я сделал следующее:

dataframe.groupBy("uniqueColumn").min("time")

Это будет сгруппировано по заданному столбцу, и в той же группе выберите тот, у которого минимальное время (это сохранит первый и удалит другие)

Используйте метод dropDuplicates по умолчанию, он сохраняет первый случай

Я только что сделал что-то похожее на то, что вам нужно, используя pyspark drop_duplicates.

Ситуация такая. У меня есть 2 кадра данных (исходящие из 2 файлов), которые точно такие же, за исключением 2 столбцов file_date (дата файла, извлеченная из имени файла) и data_date (штамп даты строки). Досадно, что у меня есть строки с одинаковыми данными data_date (и все другие ячейки столбцов), но с разными file_date, поскольку они реплицируются в каждом новом файле с добавлением одной новой строки.

Мне нужно было захватить все строки из нового файла, а также одну строку, оставшуюся от предыдущего файла. Этой строки нет в новом файле. Остальные столбцы справа от data_date одинаковы в двух файлах для одного и того же data_date.

file_1_20190122 - df1

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-22|2019-01-16| <- One row we want to keep where file_date 22nd
|        AGGH|2019-01-22|2019-01-17|
|        AGGH|2019-01-22|2019-01-18|
|        AGGH|2019-01-22|2019-01-19|
|        AGGH|2019-01-22|2019-01-20|
|        AGGH|2019-01-22|2019-01-21|
|        AGGH|2019-01-22|2019-01-22|


file_2_20190123 - df2

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-23|2019-01-17| \/ ALL rows we want to keep where file_date 23rd
|        AGGH|2019-01-23|2019-01-18|
|        AGGH|2019-01-23|2019-01-19|
|        AGGH|2019-01-23|2019-01-20|
|        AGGH|2019-01-23|2019-01-21|
|        AGGH|2019-01-23|2019-01-22|
|        AGGH|2019-01-23|2019-01-23|

Это потребует от нас сортировки и объединения файлов df, а затем их дедупликации во всех столбцах, кроме одного. Позвольте мне провести вас через это.

union_df = df1.union(df2) \
                .sort(['station_code', 'data_date'], ascending=[True, True])

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-22|2019-01-16| <- keep
|        AGGH|2019-01-23|2019-01-17| <- keep
|        AGGH|2019-01-22|2019-01-17| x- drop
|        AGGH|2019-01-22|2019-01-18| x- drop
|        AGGH|2019-01-23|2019-01-18| <- keep
|        AGGH|2019-01-23|2019-01-19| <- keep
|        AGGH|2019-01-22|2019-01-19| x- drop
|        AGGH|2019-01-23|2019-01-20| <- keep
|        AGGH|2019-01-22|2019-01-20| x- drop
|        AGGH|2019-01-22|2019-01-21| x- drop
|        AGGH|2019-01-23|2019-01-21| <- keep
|        AGGH|2019-01-23|2019-01-22| <- keep
|        AGGH|2019-01-22|2019-01-22| x- drop
|        AGGH|2019-01-23|2019-01-23| <- keep

Здесь мы отбрасываем уже отсортированные дублированные строки, исключая ключи ['file_date', 'data_date'].

nonduped_union_df = union_df \
            .drop_duplicates(['station_code', 'data_date', 'time_zone', 
                              'latitude', 'longitude', 'elevation', 
                              'highest_temperature', 'lowest_temperature', 
                              'highest_temperature_10_year_normal', 
                              'another_50_columns'])

И результат содержит ОДНУ строку с самой ранней датой из DF1, которой нет в DF2, и ВСЕ строки из DF2.

nonduped_union_df.select(['station_code', 'file_date', 'data_date', 
                          'highest_temperature', 'lowest_temperature']) \
                         .sort(['station_code', 'data_date'], ascending=[True, True]) \
                         .show(30)


+------------+----------+----------+-------------------+------------------+
|station_code| file_date| data_date|highest_temperature|lowest_temperature|
+------------+----------+----------+-------------------+------------------+
|        AGGH|2019-01-22|2019-01-16|                 90|                77| <- df1 22nd
|        AGGH|2019-01-23|2019-01-17|                 90|                77| \/- df2 23rd
|        AGGH|2019-01-23|2019-01-18|                 91|                75|
|        AGGH|2019-01-23|2019-01-19|                 88|                77|
|        AGGH|2019-01-23|2019-01-20|                 88|                77|
|        AGGH|2019-01-23|2019-01-21|                 88|                77|
|        AGGH|2019-01-23|2019-01-22|                 90|                75|
|        AGGH|2019-01-23|2019-01-23|                 90|                75|
|        CWCA|2019-01-22|2019-01-15|                 23|                -2|
|        CWCA|2019-01-23|2019-01-16|                  7|                -8|
|        CWCA|2019-01-23|2019-01-17|                 28|                -6|
|        CWCA|2019-01-23|2019-01-18|                  0|               -13|
|        CWCA|2019-01-23|2019-01-19|                 25|               -15|
|        CWCA|2019-01-23|2019-01-20|                 -4|               -18|
|        CWCA|2019-01-23|2019-01-21|                 27|                -6|
|        CWCA|2019-01-22|2019-01-22|                 30|                17|
|        CWCA|2019-01-23|2019-01-22|                 30|                13|
|        CWCO|2019-01-22|2019-01-15|                 34|                29|
|        CWCO|2019-01-23|2019-01-16|                 33|                13|
|        CWCO|2019-01-22|2019-01-16|                 33|                13|
|        CWCO|2019-01-22|2019-01-17|                 23|                 7|
|        CWCO|2019-01-23|2019-01-17|                 23|                 7|
+------------+----------+----------+-------------------+------------------+
only showing top 30 rows

Возможно, это не лучший ответ для этого случая, но у меня он сработал.

Дай мне знать, если где-то застрял.

Кстати - если кто-нибудь может сказать мне, как выбрать все столбцы в df, кроме одного, не перечисляя их в списке - я буду очень благодарен.

С уважением, G

Я бы попробовал так:

Предполагая, что ваш data_df выглядит так, и мы хотим сохранить строки с наибольшим значением в col1 на datestr:

        col1     datestr
     0  2018-01-01
     1  2018-01-01
     2  2018-01-01
     3  2018-01-01
     4  2018-01-01
     0  2018-02-01
     1  2018-02-01
     2  2018-02-01
     3  2018-02-01
     4  2018-02-01
     0  2018-03-01
     1  2018-03-01
     2  2018-03-01
     3  2018-03-01
     4  2018-03-01

ты можешь сделать:

      from pyspark.sql import Window 
import pyspark.sql.functions as F

w = Window.partitionBy('datestr')
data_df = data_df.withColumn("max", F.max(F.col("col1"))\
    .over(w))\
    .where(F.col('max') == F.col('col1'))\
    .drop("max")

это приводит к:

        col1     datestr
     4  2018-01-01
     4  2018-02-01
     4  2018-03-01

если наборы данных невелики, конвертируйте в фрейм данных pandas и удаляйте дубликаты, сохраняя последние или первые, а затем конвертируйте обратно.

Учитывая приведенную ниже таблицу:

      +----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
|   0|2018-02-01|
|   1|2018-02-01|
|   2|2018-02-01|
|   3|2018-02-01|
|   4|2018-02-01|
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+

Сделать это можно в два этапа:

Сгруппируйте по данной таблице на основе столбца col1 и выберите минимальную дату.

      +----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+

left Присоединяется к результирующей таблице с исходной таблицей в столбцах col1 и min_datestr.

Другие вопросы по тегам