Как найти кумулятивную частоту без группировки в pyspark dataframe
У меня есть столбец подсчета в фрейме данных Pyspark как:
id Count Percent
a 3 50
b 3 50
Я хочу получить результат в виде кадра:
id Count Percent CCount CPercent
a 3 50 3 50
b 3 50 6 100
Я не могу использовать pandas dataframe, так как база данных очень большая. Я нашел ответы, указывающие на раздел окна, но у меня нет такого столбца для разделения. Кто-нибудь может подсказать, как это сделать в фрейме pyspark. Примечание: pyspark версия 1.6
1 ответ
Оконный подход потребует перемещения всех данных в один раздел, и, как вы указали в своем посте, ваш набор данных слишком велик для этого. Чтобы обойти это, я немного адаптировал этот подход. Этот метод вычисляет совокупную сумму для каждого раздела после создания словаря смещения для каждого раздела. Это позволяет рассчитать накопленную сумму для каждого раздела параллельно с минимальной перестановкой данных:
Сначала давайте сгенерируем некоторые тестовые данные:
data = sc.parallelize([('a',1,25.0),('b',2,25.0),('c',3,50.0)]).toDF(['id','Count','Percent'])
Это вспомогательный метод, который я настроил (см. Оригинальный код здесь)
from collections import defaultdict
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql import Window
def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
cumulative_sum = defaultdict(float)
for event in event_stream:
cumulative_sum["Count"] += event["Count"]
cumulative_sum["Percent"] += event["Percent"]
for grp, cumulative_sum in cumulative_sum .iteritems():
yield (grp, (partition_index, cumulative_sum))
def compute_offsets_per_group_factory(num_partitions):
def _mapper(partial_sum_stream):
per_partition_cumulative_sum = dict(partial_sum_stream)
cumulative_sum = 0
offset = {}
for partition_index in range(num_partitions):
offset[partition_index] = cumulative_sum
cumulative_sum += per_partition_cumulative_sum.get(partition_index, 0)
return offset
return _mapper
def compute_cumulative_sum_per_group_factory(global_offset):
def _mapper(partition_index, event_stream):
local_cumulative_sum = defaultdict(float)
for event in event_stream:
local_cumulative_sum["Count"] += event["Count"]
count_cumulative_sum = local_cumulative_sum["Count"] + global_offset.value["Count"][partition_index]
local_cumulative_sum["Percent"] += event["Percent"]
percentage_cumulative_sum = local_cumulative_sum["Percent"] + global_offset.value["Percent"][partition_index]
yield Row(CCount= count_cumulative_sum, CPercent = percentage_cumulative_sum, **event.asDict())
return _mapper
def compute_cumulative_sum(points_rdd):
# First pass to compute the cumulative offset dictionary
compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())
offsets_per_group = points_rdd.\
mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
groupByKey().mapValues(compute_offsets_per_group).\
collectAsMap()
# Second pass to compute the cumulative sum using the offset dictionary
sc = points_rdd.context
compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))
return points_rdd.\
mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)
Используя эти вспомогательные методы на тестовых данных:
compute_cumulative_sum(data.rdd).toDF().show()
дает:
+------+--------+-----+-------+---+
|CCount|CPercent|Count|Percent| id|
+------+--------+-----+-------+---+
| 1.0| 25.0| 1| 25.0| a|
| 3.0| 50.0| 2| 25.0| b|
| 6.0| 100.0| 3| 50.0| c|
+------+--------+-----+-------+---+