Pyspark groupby с udf: плохая работа на локальной машине

Я пытаюсь провести анализ огромного набора данных, состоящего из нескольких ежедневных файлов по 15 ГБ каждый. Чтобы быть быстрее, просто для целей тестирования, я создал очень маленький набор данных, который включает в себя все соответствующие сценарии. Я должен проанализировать правильную последовательность действий (т. Е. Она похожа на журнал или аудит) для каждого пользователя.

Для этого я определил функцию udf, а затем применил групповую обработку. ниже код для воспроизведения моего варианта использования:

import pyspark
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import time
sc = SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession.builder.appName('example').getOrCreate()

d = spark.createDataFrame(
    [(133515, "user1", 100, 'begin'),
     (133515, "user1", 125, 'ok'),
     (133515, "user1", 150, 'ok'),
     (133515, "user1", 200, 'end'),
     (133515, "user1", 250, 'begin'),
     (133515, "user1", 300, 'end'),
     (133515, "user1", 310, 'begin'),
     (133515, "user1", 335, 'ok'),
     (133515, "user1", 360, 'ok'),
     # user1 missing END and STOPPED
     (789456, "user2", 150, 'begin'),
     (789456, "user2", 175, 'ok'),
     (789456, "user2", 200, 'end'),
     # user2 stopped
     (712346, "user3", 100, 'begin'),
     (712346, "user3", 125, 'ok'),
     (712346, "user3", 150, 'ok'),
     (712346, "user3", 200, 'end'),
     #user3 stopped
     (789456, "user4", 150, 'begin'),
     (789456, "user4", 300, 'end'),
     (789456, "user4", 350, 'begin'),
     (789456, "user4", 375, 'ok'),
     (789456, "user4", 450, 'end'),
     (789456, "user4", 475, 'ok'),
     #user4 missing BEGIN but ALIVE

    ], ("ID", "user", "epoch", "ACTION")).orderBy(F.col('epoch'))
d.show()
zip_lists = F.udf(lambda x, y: [list(z) for z in zip(x, y)], ArrayType(StringType()))

start=time.time()
d2 = d.groupBy(F.col('ID'), F.col('user'))\
.agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))
d2.show(50, False)
end = time.time()
print(end-start)

это приносит мне следующие результаты:

+------+-----+--------------------------------------------------------------------------------------------------------------+
|ID    |user |couples                                                                                                       |
+------+-----+--------------------------------------------------------------------------------------------------------------+
|789456|user4|[[150, begin], [300, end], [350, begin], [375, ok], [450, end], [475, ok]]                                    |
|712346|user3|[[100, begin], [125, ok], [150, ok], [200, end]]                                                              |
|133515|user1|[[100, begin], [125, ok], [150, ok], [200, end], [250, begin], [300, end], [310, begin], [335, ok], [360, ok]]|
|789456|user2|[[150, begin], [175, ok], [200, end]]                                                                         |
+------+-----+--------------------------------------------------------------------------------------------------------------+

189.9082863330841

Разве это не слишком медленно?

Я использую современный ноутбук с Conda. Я установил pyspark с помощью навигатора conda.

Что-то я делаю не так? это слишком много для такого маленького набора данных

1 ответ

Решение

Вместо агрегирования по двум столбцам я попытался создать новый столбец и собрать его:

start=time.time()

d2 = d.groupBy(F.col('ID'), F.col('user'))\
      .agg(zip_lists(F.collect_list('epoch'), F.collect_list('ACTION')).alias('couples'))\
      .collect()

end = time.time()
print('first solution:', end-start)



start = time.time()

d3 = d.select(d.ID, d.user, F.struct([d.epoch, d.ACTION]).alias('couple'))
d4 = d3.groupBy(d3.ID, d3.user)\
       .agg(F.collect_list(d3.couple).alias('couples'))\
       .collect()

end = time.time()
print('second solution:', end-start)

На моей машине это изменение делает результат немного лучше!:D:

first solution: 2.247227907180786
second solution: 0.8280930519104004
Другие вопросы по тегам