Распараллелить применение после групповых панд
Я использовал rosetta.parallel.pandas_easy для распараллеливания применять после группы, например:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
Однако кто-нибудь выяснил, как распараллелить функцию, которая возвращает фрейм данных? Этот код не работает для розетты, как и ожидалось.
def tmpFunc(df):
df['c'] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
8 ответов
Кажется, это работает, хотя это действительно должно быть встроено в панд
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
Ответ Ивана великолепен, но, похоже, его можно немного упростить, также устраняя необходимость зависеть от joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
Между прочим: это не может заменить любой groupby.apply(), но будет охватывать типичные случаи: например, это должно охватывать случаи 2 и 3 в документации, в то время как вы должны получить поведение случая 1, задав аргумент axis=1
в финал pandas.concat()
вызов.
У меня есть хак, который я использую для получения распараллеливания в Pandas. Я разбиваю свой блок данных на куски, помещаю каждый блок в элемент списка, а затем использую параллельные биты ipython для параллельного применения к списку блоков данных. Затем я собрал список, используя панд concat
функция.
Однако это не всегда применимо. Это работает для меня, потому что функция, которую я хочу применить к каждому фрагменту кадра данных, занимает около минуты. И разборка и сбор моих данных не займет много времени. Так что это явно клудж. С учетом сказанного, вот пример. Я использую ноутбук Ipython, так что вы увидите %%time
магия в моем коде:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
Для этого примера я собираюсь сделать "чанки", основанные на вышеупомянутой группировке, но это не должно быть то, как данные разбиваются на части. Хотя это довольно распространенная модель.
dflist = []
for name, group in grouped:
dflist.append(group)
установить параллельные биты
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
написать глупую функцию, чтобы применить к нашим данным
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
Теперь давайте запустим код последовательно, затем параллельно. сериал первый:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
теперь параллельно
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
тогда потребуется всего несколько мс, чтобы объединить их обратно в один фрейм данных
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
Я использую 6 движков IPython на моем MacBook, но вы можете видеть, что он уменьшает время выполнения до 14 с.
Для действительно продолжительного стохастического моделирования я могу использовать бэкэнд AWS, запустив кластер с помощью StarCluster. Однако большую часть времени я распараллеливаю только 8 процессоров на моем MBP.
Краткий комментарий, сопровождающий ответ Д.Д. Лонга. Я обнаружил, что если число групп очень велико (скажем, сотни тысяч), и ваша функция apply выполняет что-то довольно простое и быстрое, то разбивает ваш фрейм данных на куски и назначает каждый блок работнику для выполнения groupby-apply (в последовательном режиме) может быть намного быстрее, чем выполнение параллельного groupby-apply, когда рабочие считывают очередь, содержащую множество групп. Пример:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
Итак, наш фрейм данных выглядит так:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Обратите внимание, что столбец "а" имеет много групп (например, идентификаторы клиентов):
len(df.a.unique())
15000
Функция для работы с нашими группами:
def f1(group):
time.sleep(0.0001)
return group
Начать пул:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Делаем параллельные групповые заявки:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Давайте теперь добавим столбец, который делит df на гораздо меньшее количество групп:
df['b'] = np.random.randint(0, 12, nrows)
Теперь вместо 15000 групп их всего 12:
len(df.b.unique())
12
Мы разделим наш df и сделаем групповое применение для каждого чанка.
ppe = ProcessPoolExecutor(12)
Оболочка веселья:
def f2(df):
df.groupby('a').apply(f1)
return df
Отправьте каждый чанк, который будет обработан в серии:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Обратите внимание, что количество времени, проведенного на группу, не изменилось. Скорее всего, изменилась длина очереди, с которой работники зачитывают. Я подозреваю, что происходит то, что рабочие не могут получить доступ к общей памяти одновременно, и постоянно возвращаются, чтобы считать из очереди, и, таким образом, наступают друг другу на пальцы ног. При работе с большими кусками рабочие возвращаются реже, и поэтому эта проблема уменьшается, и общее выполнение выполняется быстрее.
ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: я являюсь владельцем и основным участником/сопровождающим
— это пакет Python, который я создал более 4 лет назад как пакет, который эффективно применяет любую функцию к фрейму данных или серии pandas самым быстрым доступным способом. На сегодняшний день имеет более 2 тысяч звезд GitHub, 250 тысяч загрузок в месяц и покрытие кода 95%.
Начиная с версии 1.3.2,swifter
предлагает простой интерфейс для производительной параллельной группы с применением:
df.swifter.groupby(df.index).apply(tmpFunc)
Я также создал эталонные тесты производительности , демонстрирующие улучшение производительности Swifter, с ключевым визуальным образцом, воспроизведенным здесь:Swifter Groupby Apply Performance Benchmark
Вы можете легко установить быстрее (с функцией группового применения) либо через pip:
pip install swifter[groupby]>=1.3.2
или через конду:
conda install -c conda-forge swifter>=1.3.2 ray>=1.0.0
Пожалуйста, ознакомьтесь с swifterREADME и документацией для получения дополнительной информации.
Люди переходят на использование бодо для параллелизма. Это самый быстрый движок, доступный для распараллеливания Python, поскольку он компилирует ваш код с помощью MPI. Его новый компилятор сделал его намного быстрее, чем Dask, Ray, multiprocessing, pandarel и т. д. Прочтите о бодо и даске в этом сообщении в блоге и посмотрите, что Трэвис говорит о бодо в своем LinkedIn! Он является основателем Anaconda: Цитата: «бодо - это настоящая сделка».
https://bodo.ai/blog/performance-and-cost-of-bodo-vs-spark-dask-ray
Что касается того, как использовать groupby с бодо, здесь я пишу пример кода:
#install bodo through your terminal
conda create -n Bodo python=3.9 -c conda-forge
conda activate Bodo
conda install bodo -c bodo.ai -c conda-forge
Вот пример кода для groupby:
import time
import pandas as pd
import bodo
@bodo.jit
def read_data():
""" a dataframe with 2 columns, headers: 'A', 'B'
or you can just create a data frame instead of reading it from flat file
"""
return pd.read_parquet("your_input_data.pq")
@bodo.jit
def data_groupby(input_df):
t_1 = time.time()
df2 = input_df.groupby("A", as_index=False).sum()
t_2 = time.time()
print("Compute time: {:.2f}".format(t_2-t_1))
return df2, t_2-t_1
if __name__ == "__main__":
df = read_data()
t0 = time.time()
output, compute_time = data_groupby(df)
t2 = time.time()
total_time = t2 - t0
if bodo.get_rank() == 0:
print("Compilation time: {:.2f}".format(total_time - compute_time))
print("Total time second call: {:.2f}".format(total_time))
и, наконец, запустите его с помощью mpiexec через свой терминал. -n определяет количество ядер (ЦП), на которых вы хотите его запустить.
mpiexec -n 4 python filename.py
Вы можете использовать либо multiprocessing
или joblib
для достижения распараллеливания. Однако, если количество групп велико и каждый фрейм данных группы большой, время работы может быть хуже, так как вам нужно много раз переносить эти группы в ЦП. Чтобы уменьшить накладные расходы, мы можем сначала разделить данные на большие фрагменты, а затем распараллелить вычисления на этих фрагментах.
Например, предположим, что вы обрабатываете данные о запасах, где вам нужно сгруппировать акции по их коду, а затем рассчитать некоторую статистику. Вы можете сначала сгруппировать по первому символу кода (большие фрагменты), а затем выполнить действия в этой фиктивной группе:
import pandas as pd
from joblib import Parallel, delayed
def group_func(dummy_group):
# Do something to the group just like doing to the original dataframe.
# Example: calculate daily return.
res = []
for _, g in dummy_group.groupby('code'):
g['daily_return'] = g.close / g.close.shift(1)
res.append(g)
return pd.concat(res)
stock_data = stock_data.assign(dummy=stock_data['code'].str[0])
Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))
Лично я рекомендовал бы использовать dask, согласно этой теме.
Как отметил @chrisb, многопроцессорная обработка с использованием панд в python может привести к ненужным накладным расходам. Это может также не работать так же хорошо, как многопоточность или даже как один поток.
Dask создан специально для мультипроцессинга.