Почему pandas.grouby.mean намного быстрее параллельной реализации
В очень большом наборе данных я использовал функцию среднего значения для панд:
import pandas as pd
df=pd.read_csv("large_dataset.csv")
df.groupby(['variable']).mean()
Похоже, что функция не использует многопроцессорность, и поэтому я реализовал параллельную версию:
import pandas as pd
from multiprocessing import Pool, cpu_count
def meanFunc(tmp_name, df_input):
df_res=df_input.mean().to_frame().transpose()
return df_res
def applyParallel(dfGrouped, func):
num_process=int(cpu_count())
with Pool(num_process) as p:
ret_list=p.starmap(func, [[name, group] for name, group in dfGrouped])
return pd.concat(ret_list)
applyParallel(df.groupby(['variable']), meanFunc)
Однако, похоже, что реализация pandas все еще намного быстрее, чем моя параллельная реализация.
Я смотрю на исходный код для панды groupby, и я вижу, что он использует Cython. Это причина?
def _cython_agg_general(self, how, alt=None, numeric_only=True,
min_count=-1):
output = {}
for name, obj in self._iterate_slices():
is_numeric = is_numeric_dtype(obj.dtype)
if numeric_only and not is_numeric:
continue
try:
result, names = self.grouper.aggregate(obj.values, how,
min_count=min_count)
except AssertionError as e:
raise GroupByError(str(e))
output[name] = self._try_cast(result, obj)
if len(output) == 0:
raise DataError('No numeric types to aggregate')
return self._wrap_aggregated_output(output, names)
1 ответ
Краткий ответ - используйте dask, если вы хотите параллелизма для подобных случаев. В вашем подходе есть подводные камни, которых он избегает. Это все еще не может быть быстрее, но даст вам лучший выстрел и является в значительной степени заменой панд.
Более длинный ответ
1) Параллелизм по своей природе добавляет накладные расходы, поэтому в идеале операция, которую вы выполняете параллельно, стоит довольно дорого. Сложение чисел не особенно - вы правы, что здесь используется Cython, код, который вы просматриваете, - это логика рассылки. Здесь находится основной ядро Cython, которое переводится в очень простой цикл C.
2) Вы используете мультиобработку, что означает, что каждый процесс должен получить копию данных. Это дорого Обычно вы должны делать это в python из-за GIL - вы действительно можете (и dask делает) использовать потоки здесь, потому что операция pandas находится в C и освобождает GIL.
3) Как отметил @AKX в комментариях - итерация перед распараллеливанием (... name, group in dfGrouped
) также является относительно дорогим - это создание новых кадров данных для каждой группы. Оригинальный алгоритм панд перебирает данные на месте.