Распараллелить применение после групповых панд

Я использовал rosetta.parallel.pandas_easy для распараллеливания применять после группы, например:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

Однако кто-нибудь выяснил, как распараллелить функцию, которая возвращает фрейм данных? Этот код не работает для розетты, как и ожидалось.

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)

8 ответов

Решение

Кажется, это работает, хотя это действительно должно быть встроено в панд

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)

Ответ Ивана великолепен, но, похоже, его можно немного упростить, также устраняя необходимость зависеть от joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

Между прочим: это не может заменить любой groupby.apply(), но будет охватывать типичные случаи: например, это должно охватывать случаи 2 и 3 в документации, в то время как вы должны получить поведение случая 1, задав аргумент axis=1 в финал pandas.concat() вызов.

У меня есть хак, который я использую для получения распараллеливания в Pandas. Я разбиваю свой блок данных на куски, помещаю каждый блок в элемент списка, а затем использую параллельные биты ipython для параллельного применения к списку блоков данных. Затем я собрал список, используя панд concat функция.

Однако это не всегда применимо. Это работает для меня, потому что функция, которую я хочу применить к каждому фрагменту кадра данных, занимает около минуты. И разборка и сбор моих данных не займет много времени. Так что это явно клудж. С учетом сказанного, вот пример. Я использую ноутбук Ipython, так что вы увидите %%time магия в моем коде:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')

Для этого примера я собираюсь сделать "чанки", основанные на вышеупомянутой группировке, но это не должно быть то, как данные разбиваются на части. Хотя это довольно распространенная модель.

dflist = []
for name, group in grouped:
    dflist.append(group)

установить параллельные биты

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

написать глупую функцию, чтобы применить к нашим данным

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf

Теперь давайте запустим код последовательно, затем параллельно. сериал первый:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

теперь параллельно

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

тогда потребуется всего несколько мс, чтобы объединить их обратно в один фрейм данных

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

Я использую 6 движков IPython на моем MacBook, но вы можете видеть, что он уменьшает время выполнения до 14 с.

Для действительно продолжительного стохастического моделирования я могу использовать бэкэнд AWS, запустив кластер с помощью StarCluster. Однако большую часть времени я распараллеливаю только 8 процессоров на моем MBP.

Краткий комментарий, сопровождающий ответ Д.Д. Лонга. Я обнаружил, что если число групп очень велико (скажем, сотни тысяч), и ваша функция apply выполняет что-то довольно простое и быстрое, то разбивает ваш фрейм данных на куски и назначает каждый блок работнику для выполнения groupby-apply (в последовательном режиме) может быть намного быстрее, чем выполнение параллельного groupby-apply, когда рабочие считывают очередь, содержащую множество групп. Пример:

import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})

Итак, наш фрейм данных выглядит так:

    a
0   3425
1   1016
2   8141
3   9263
4   8018

Обратите внимание, что столбец "а" имеет много групп (например, идентификаторы клиентов):

len(df.a.unique())
15000

Функция для работы с нашими группами:

def f1(group):
    time.sleep(0.0001)
    return group

Начать пул:

ppe = ProcessPoolExecutor(12)
futures = []
results = []

Делаем параллельные групповые заявки:

%%time

for name, group in df.groupby('a'):
    p = ppe.submit(f1, group)
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results)
del ppe

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s

Давайте теперь добавим столбец, который делит df на гораздо меньшее количество групп:

df['b'] = np.random.randint(0, 12, nrows)

Теперь вместо 15000 групп их всего 12:

len(df.b.unique())
12

Мы разделим наш df и сделаем групповое применение для каждого чанка.

ppe = ProcessPoolExecutor(12)

Оболочка веселья:

def f2(df):
    df.groupby('a').apply(f1)
    return df

Отправьте каждый чанк, который будет обработан в серии:

%%time

for i in df.b.unique():
    p = ppe.submit(f2, df[df.b==i])
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s

Обратите внимание, что количество времени, проведенного на группу, не изменилось. Скорее всего, изменилась длина очереди, с которой работники зачитывают. Я подозреваю, что происходит то, что рабочие не могут получить доступ к общей памяти одновременно, и постоянно возвращаются, чтобы считать из очереди, и, таким образом, наступают друг другу на пальцы ног. При работе с большими кусками рабочие возвращаются реже, и поэтому эта проблема уменьшается, и общее выполнение выполняется быстрее.

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: я являюсь владельцем и основным участником/сопровождающим

— это пакет Python, который я создал более 4 лет назад как пакет, который эффективно применяет любую функцию к фрейму данных или серии pandas самым быстрым доступным способом. На сегодняшний день имеет более 2 тысяч звезд GitHub, 250 тысяч загрузок в месяц и покрытие кода 95%.

Начиная с версии 1.3.2,swifterпредлагает простой интерфейс для производительной параллельной группы с применением:

      df.swifter.groupby(df.index).apply(tmpFunc)

Я также создал эталонные тесты производительности , демонстрирующие улучшение производительности Swifter, с ключевым визуальным образцом, воспроизведенным здесь:Swifter Groupby Apply Performance Benchmark

Вы можете легко установить быстрее (с функцией группового применения) либо через pip:

      pip install swifter[groupby]>=1.3.2

или через конду:

      conda install -c conda-forge swifter>=1.3.2 ray>=1.0.0

Пожалуйста, ознакомьтесь с swifterREADME и документацией для получения дополнительной информации.

Люди переходят на использование бодо для параллелизма. Это самый быстрый движок, доступный для распараллеливания Python, поскольку он компилирует ваш код с помощью MPI. Его новый компилятор сделал его намного быстрее, чем Dask, Ray, multiprocessing, pandarel и т. д. Прочтите о бодо и даске в этом сообщении в блоге и посмотрите, что Трэвис говорит о бодо в своем LinkedIn! Он является основателем Anaconda: Цитата: «бодо - это настоящая сделка».

https://bodo.ai/blog/performance-and-cost-of-bodo-vs-spark-dask-ray

https://www.linkedin.com/posts/teoliphant_performance-and-cost-evaluation-of-bodo-vs-activity-6873290539773632512-y5iZ/

Что касается того, как использовать groupby с бодо, здесь я пишу пример кода:

      #install bodo through your terminal

conda create -n Bodo python=3.9 -c conda-forge
conda activate Bodo
conda install bodo -c bodo.ai -c conda-forge

Вот пример кода для groupby:

      import time
import pandas as pd
import bodo


@bodo.jit
def read_data():
""" a dataframe with 2 columns, headers: 'A', 'B' 
or you can just create a data frame instead of reading it from flat file 
"""
    return pd.read_parquet("your_input_data.pq")


@bodo.jit
def data_groupby(input_df):
    t_1 = time.time()
    df2 = input_df.groupby("A", as_index=False).sum()
    t_2 = time.time()
    print("Compute time: {:.2f}".format(t_2-t_1))
    return df2, t_2-t_1


if __name__ == "__main__":
    df = read_data()
    t0 = time.time()
    output, compute_time = data_groupby(df)
    t2 = time.time()
    total_time = t2 - t0
    if bodo.get_rank() == 0:
        print("Compilation time: {:.2f}".format(total_time - compute_time))
        print("Total time second call: {:.2f}".format(total_time))

и, наконец, запустите его с помощью mpiexec через свой терминал. -n определяет количество ядер (ЦП), на которых вы хотите его запустить.

      mpiexec -n 4 python filename.py

Вы можете использовать либо multiprocessing или joblibдля достижения распараллеливания. Однако, если количество групп велико и каждый фрейм данных группы большой, время работы может быть хуже, так как вам нужно много раз переносить эти группы в ЦП. Чтобы уменьшить накладные расходы, мы можем сначала разделить данные на большие фрагменты, а затем распараллелить вычисления на этих фрагментах.

Например, предположим, что вы обрабатываете данные о запасах, где вам нужно сгруппировать акции по их коду, а затем рассчитать некоторую статистику. Вы можете сначала сгруппировать по первому символу кода (большие фрагменты), а затем выполнить действия в этой фиктивной группе:

import pandas as pd
from joblib import Parallel, delayed

def group_func(dummy_group):
    # Do something to the group just like doing to the original dataframe.
    #     Example: calculate daily return.
    res = []
    for _, g in dummy_group.groupby('code'):
        g['daily_return']  = g.close / g.close.shift(1)
        res.append(g)
    return pd.concat(res)

stock_data = stock_data.assign(dummy=stock_data['code'].str[0])

Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))

Лично я рекомендовал бы использовать dask, согласно этой теме.

Как отметил @chrisb, многопроцессорная обработка с использованием панд в python может привести к ненужным накладным расходам. Это может также не работать так же хорошо, как многопоточность или даже как один поток.

Dask создан специально для мультипроцессинга.

Другие вопросы по тегам