Варианты ускорения кода Python через распараллеливание / многопроцессорность
Ниже я собрал 4 способа завершить выполнение кода, который включает сортировку обновлений Pandas Dataframes.
Я хотел бы применить лучшие методы для ускорения выполнения кода. Я использую лучшие доступные методы?
Кто-нибудь, пожалуйста, поделитесь своими мыслями о следующих идеях?
Я зацикливаюсь на фрейме данных, потому что процесс решения моей проблемы, кажется, требует его. Будет ли увеличение скорости при использовании Dask Dataframes?
Может ли версия Dask Distributed получить выгоду от установки определенного числа рабочих, процессов, потоков на одного рабочего? Люди отмечают, что увеличение количества процессов вместо потоков (или наоборот) лучше всего подходит для некоторых случаев.
Какую аппаратную инфраструктуру можно использовать для такого рода кода? Многопроцессорная версия еще быстрее на экземпляре AWS с большим количеством физических процессорных ядер.
- Будет ли установка Kubernetes/AWS с Dask Distributed намного быстрее?
- Может ли это быть легко адаптировано для запуска с помощью графического процессора локально или на экземпляре AWS с несколькими графическими процессорами?
Это время завершения для справки:
- Регулярный цикл For:
34 seconds
- Задержка:
21 seconds
- Распределенная Dask (локальная машина):
21 seconds
- многопроцессорные:
10 seconds
from dask.distributed import Client
from multiprocessing import Pool
from dask import delayed
import pandas as pd
import numpy as np
client = Client()
import random
import dask
#Setting original input data that will be used in the functions
alist=['A','B','C','D','E','F','G','H','I']
set_table=pd.DataFrame({"A":alist,
"B":[i for i in range(1,10)],
"C":[i for i in range(11,20)],
"D":[0]*9})
#Assembled random list of combinations
criteria_list=[]
for i in range(0,10000):
criteria_list.append(random.sample(alist,6))
#Sorts and filters the original df
def one_filter_sorter(criteria):
sorted_table=set_table[set_table['A'].isin(criteria)]
sorted_table=sorted_table.sort_values(['B','C'],ascending=True)
return sorted_table
#Exists to help the function below. Simplified for this example
def helper_function(sorted_table,idx):
if alist.index(sorted_table.loc[idx,'A'])>5:
return True
#last function that retuns the gathered result
def two_go_downrows(sorted_table):
for idx, row in sorted_table.iterrows():
if helper_function(sorted_table,idx)==True:
sorted_table.loc[idx,'D'] = 100 - sorted_table.loc[idx,'C']
res=sorted_table.loc[:,['A','D']].to_dict()
return res
#--Loop version
result=[]
for criteria in criteria_list:
A=one_filter_sorter(criteria)
B=two_go_downrows(A)
result.append(B)
#--Multiprocessed version
result=[]
if __name__ == '__main__':
pool=Pool(processes=6)
A=pool.map(one_filter_sorter, criteria)
B=pool.map(two_go_downrows, A)
result.append(B)
#--Delayed version
result=[]
for criteria in criteria_list:
A=delayed(one_filter_sorter)(criteria)
B=delayed(two_go_downrows)(A)
result.append(B)
dask.compute(result)
#--Distributed version
A= client.map(one_filter_sorter,criteria_list)
B= client.map(two_go_downrows,A)
client.gather(B)
Спасибо