Как сократить время многопроцессорной обработки в python
Я пытаюсь построить многопроцессорную работу в python, чтобы уменьшить скорость вычислений, но, похоже, после многопроцессорной обработки общая скорость вычислений значительно снизилась. Я создал 4 разных процесса и разделил dataFrame на 4 разных dataframe, которые будут входом для каждого процесса. После определения сроков каждого процесса кажется, что накладные расходы значительны, и мне было интересно, есть ли способ уменьшить эти накладные расходы.
Я использую windows7, python 3.5 и моя машина имеет 8 ядер.
def doSomething(args, dataPassed,):
processing data, and calculating outputs
def parallelize_dataframe(df, nestedApply):
df_split = np.array_split(df, 4)
pool = multiprocessing.Pool(4)
df = pool.map(nestedApply, df_split)
print ('finished with Simulation')
time = float((dt.datetime.now() - startTime).total_seconds())
pool.close()
pool.join()
def nestedApply(df):
func2 = partial(doSomething, args=())
res = df.apply(func2, axis=1)
res = [output Tables]
return res
if __name__ == '__main__':
data = pd.read_sql_query(query, conn)
parallelize_dataframe(data, nestedApply)
1 ответ
Я бы предложил использовать очереди вместо предоставления вашего DataFrame в виде кусков. Вам нужно много ресурсов для копирования каждого чанка, и на это уходит довольно много времени. Вам может не хватить памяти, если ваш DataFrame действительно большой. Используя очереди, вы можете выиграть от быстрых итераторов в пандах. Вот мой подход. Накладные расходы уменьшаются со сложностью ваших работников. К сожалению, мои работники далеко не просты, чтобы действительно показать это, но sleep
немного имитирует сложность.
import pandas as pd
import multiprocessing as mp
import numpy as np
import time
def worker(in_queue, out_queue):
for row in iter(in_queue.get, 'STOP'):
value = (row[1] * row[2] / row[3]) + row[4]
time.sleep(0.1)
out_queue.put((row[0], value))
if __name__ == "__main__":
# fill a DataFrame
df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))
in_queue = mp.Queue()
out_queue = mp.Queue()
# setup workers
numProc = 2
process = [mp.Process(target=worker,
args=(in_queue, out_queue)) for x in range(numProc)]
# run processes
for p in process:
p.start()
# iterator over rows
it = df.itertuples()
# fill queue and get data
# code fills the queue until a new element is available in the output
# fill blocks if no slot is available in the in_queue
for i in range(len(df)):
while out_queue.empty():
# fill the queue
try:
row = next(it)
in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple
except StopIteration:
break
row_data = out_queue.get()
df.loc[row_data[0], "Result"] = row_data[1]
# signals for processes stop
for p in process:
in_queue.put('STOP')
# wait for processes to finish
for p in process:
p.join()
С помощью numProc = 2
это занимает 50 секунд за цикл, с numProc = 4
это в два раза быстрее.