Pyspark Dataframe с помощью фильтрации

У меня есть фрейм данных, как показано ниже

cust_id   req    req_met
-------   ---    -------
 1         r1      1
 1         r2      0
 1         r2      1
 2         r1      1
 3         r1      1
 3         r2      1
 4         r1      0
 5         r1      1
 5         r2      0
 5         r1      1

Я должен посмотреть на клиентов, посмотреть, сколько у них требований и посмотреть, выполнили ли они хотя бы один раз. Может быть несколько записей с одним и тем же клиентом и требованием, один с удовлетворенным и не выполненным. В приведенном выше случае мой вывод должен быть

cust_id
-------
  1
  2
  3

Что я сделал, так это

say initial dataframe is df
df1 = df.groupby('cust_id').countdistinct('req').alias('num_of_req').sum('req_met').alias('sum_req_met')

df2 = df1.filter(df1.num_of_req == df1.sum_req_met)

Но в некоторых случаях это не дает правильных результатов

Как это может быть сделано?

3 ответа

Решение

Во-первых, я просто подготовлю набор данных игрушек из приведенного выше,

from pyspark.sql.functions import col
import pyspark.sql.functions as fn

df = spark.createDataFrame([[1, 'r1', 1],
 [1, 'r2', 0],
 [1, 'r2', 1],
 [2, 'r1', 1],
 [3, 'r1', 1],
 [3, 'r2', 1],
 [4, 'r1', 0],
 [5, 'r1', 1],
 [5, 'r2', 0],
 [5, 'r1', 1]], schema=['cust_id', 'req', 'req_met'])
df = df.withColumn('req_met', col("req_met").cast(IntegerType()))
df = df.withColumn('cust_id', col("cust_id").cast(IntegerType()))

Я делаю то же самое по группам cust_id а также req затем посчитать req_met, После этого я создаю функцию для удовлетворения этих требований до 0, 1

def floor_req(r):
    if r >= 1:
        return 1
    else:
        return 0
udf_floor_req = udf(floor_req, IntegerType())
gr = df.groupby(['cust_id', 'req'])
df_grouped = gr.agg(fn.sum(col('req_met')).alias('sum_req_met'))
df_grouped_floor = df_grouped.withColumn('sum_req_met', udf_floor_req('sum_req_met'))

Теперь мы можем проверить, выполнил ли каждый клиент все требования, посчитав различное количество требований и общее количество выполненных требований.

df_req = df_grouped_floor.groupby('cust_id').agg(fn.sum('sum_req_met').alias('sum_req'), 
                                                 fn.count('req').alias('n_req'))

Наконец, вам просто нужно проверить, равны ли два столбца:

df_req.filter(df_req['sum_req'] == df_req['n_req'])[['cust_id']].orderBy('cust_id').show()
 select cust_id from  
(select cust_id , MIN(sum_value) as m from 
( select cust_id,req ,sum(req_met) as sum_value from <data_frame> group by cust_id,req )
 temp group by cust_id )temp1 
where m>0 ;

Это даст желаемый результат

Это метод без каких-либо файлов udf. Немного сложно. Обычно группировка по cust_id, выполняется req, а затем определяется сумма req_met. Затем удалите cust_id, чей sum == 0.

      df.filter( ~df.cust_id.isin([x[0] for x in df.groupby('cust_id','req').agg(F.sum('req_met').alias('sum_req_met')).filter(col('sum_req_met')==0).select('cust_id').collect()]) ).select('cust_id').distinct().show()
Другие вопросы по тегам