Как распараллелить простой цикл Python?
Это, вероятно, тривиальный вопрос, но как мне распараллелить следующий цикл в Python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собрать" результаты.
Многократные процессы были бы также хороши - что бы ни было проще для этого случая. В настоящее время я использую Linux, но код также должен работать на Windows и Mac.
Какой самый простой способ распараллелить этот код?
15 ответов
Использование нескольких потоков в CPython не даст вам лучшей производительности для чистого Python-кода из-за глобальной блокировки интерпретатора (GIL). Я предлагаю использовать multiprocessing
модуль вместо:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Обратите внимание, что это не будет работать в интерактивном переводчике.
Чтобы избежать обычного FUD вокруг GIL: в любом случае не было бы никакого преимущества в использовании потоков для этого примера. Здесь вы хотите использовать процессы, а не потоки, потому что они позволяют избежать целого ряда проблем.
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
Вышеописанное прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib
).
Взято с https://blog.dominodatalab.com/simple-parallelization/
Это самый простой способ сделать это!
Вы можете использовать asyncio. (Документацию можно найти здесь). Он используется в качестве основы для нескольких асинхронных фреймворков Python, которые обеспечивают высокопроизводительные сетевые и веб-серверы, библиотеки подключения к базам данных, распределенные очереди задач и т. Д. Кроме того, он имеет API высокого и низкого уровня для решения любых проблем..
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
Теперь эта функция будет запускаться параллельно при каждом вызове без перевода основной программы в состояние ожидания. Вы также можете использовать его для распараллеливания цикла for. При вызове цикла for, хотя цикл является последовательным, но каждая итерация выполняется параллельно основной программе, как только туда попадает интерпретатор.Например:
@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))
for i in range(10):
your_function(i)
print('loop finished')
Это дает следующий результат:
loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Чтобы распараллелить простой цикл for, joblib приносит большую пользу необработанному использованию многопроцессорной обработки. Не только короткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов) или захват трассировки дочернего процесса, для лучшей отчетности об ошибках.
Отказ от ответственности: я оригинальный автор joblib.
Какой самый простой способ распараллелить этот код?
мне действительно нравится concurrent.futures
для этого доступно в Python3 начиная с версии 3.2 - и через бэкпорт до 2.6 и 2.7 на PyPi.
Вы можете использовать потоки или процессы и использовать точно такой же интерфейс.
многопроцессорная обработка
Поместите это в файл - futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
И вот вывод:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Многопоточность
Теперь поменяй ProcessPoolExecutor
в ThreadPoolExecutor
и снова запустите модуль:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Теперь вы сделали как многопоточность, так и многопроцессорность!
Обратите внимание на производительность и использование обоих вместе.
Выборка слишком мала, чтобы сравнить результаты.
Тем не менее, я подозреваю, что многопоточность будет быстрее, чем многопроцессорность в целом, особенно в Windows, поскольку Windows не поддерживает разветвление, поэтому каждому новому процессу требуется время для запуска. На Linux или Mac они, вероятно, будут ближе.
Вы можете вкладывать несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.
Есть несколько преимуществ использования Ray:
- Вы можете распараллеливать на нескольких машинах в дополнение к нескольким ядрам (с одним и тем же кодом).
- Эффективная обработка числовых данных через разделяемую память (и сериализация без копирования).
- Высокая пропускная способность при распределенном планировании.
- Отказоустойчивость.
В вашем случае вы можете запустить Ray и определить удаленную функцию
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
а затем вызвать его параллельно
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Чтобы запустить тот же пример в кластере, единственной строкой, которая могла бы измениться, был бы вызов ray.init(). Соответствующую документацию можно найти здесь.
Обратите внимание, что я помогаю развивать Рэя.
Я нашел joblib
очень полезно со мной. Пожалуйста, смотрите следующий пример:
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
n_jobs = -1: использовать все доступные ядра
Фьючерсы Dask; Я удивлен, что об этом еще никто не упомянул. . .
from dask.distributed import Client
client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)
def my_function(i):
output = <code to execute in the for loop here>
return output
futures = []
for i in <whatever you want to loop across here>:
future = client.submit(my_function, i)
futures.append(future)
results = client.gather(futures)
client.close()
Спасибо @iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'
Почему вы не используете потоки и один мьютекс для защиты одного глобального списка?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
имейте в виду, вы будете так быстро, как ваша самая медленная нить
В параллельных упаковщики со стороны библиотеки tqdm это отличный способ распараллеливания уже запущенного кода. tqdm предоставляет обратную связь о текущем прогрессе и оставшемся времени с помощью интеллектуального индикатора выполнения, который я считаю очень полезным для длительных вычислений.
Циклы можно переписать так, чтобы они выполнялись как параллельные потоки, простым вызовом
thread_map
, или как одновременные несколько процессов через простой вызов
process_map
:
from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
import time
time.sleep(1)
return num, num * multiplier
if __name__ == "__main__":
# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]
loop_idx = range(64)
multiplier = [2] * len(loop_idx)
# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)
# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)
Допустим, у нас есть асинхронная функция
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
Это должно быть запущено на большом массиве. Некоторые атрибуты передаются в программу, а некоторые используются из свойства элемента словаря в массиве.
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
Это может быть полезно при реализации многопроцессорных и параллельных / распределенных вычислений в Python.
Учебник YouTube по использованию пакета techila
Techila - это промежуточное программное обеспечение для распределенных вычислений, которое напрямую интегрируется с Python с помощью пакета techila. Функция персика в пакете может быть полезна для распараллеливания структур цикла. (Следующий фрагмент кода с форумов сообщества Techila)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
Посмотри на это;
http://docs.python.org/library/queue.html
Это может быть неправильный способ сделать это, но я бы сделал что-то вроде;
Актуальный код;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
Надеюсь, это поможет.
Очень простой пример параллельной обработки
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()