Как распараллелить простой цикл Python?

Это, вероятно, тривиальный вопрос, но как мне распараллелить следующий цикл в Python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собрать" результаты.

Многократные процессы были бы также хороши - что бы ни было проще для этого случая. В настоящее время я использую Linux, но код также должен работать на Windows и Mac.

Какой самый простой способ распараллелить этот код?

15 ответов

Решение

Использование нескольких потоков в CPython не даст вам лучшей производительности для чистого Python-кода из-за глобальной блокировки интерпретатора (GIL). Я предлагаю использовать multiprocessing модуль вместо:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Обратите внимание, что это не будет работать в интерактивном переводчике.

Чтобы избежать обычного FUD вокруг GIL: в любом случае не было бы никакого преимущества в использовании потоков для этого примера. Здесь вы хотите использовать процессы, а не потоки, потому что они позволяют избежать целого ряда проблем.

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Вышеописанное прекрасно работает на моей машине (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).

Взято с https://blog.dominodatalab.com/simple-parallelization/

Это самый простой способ сделать это!

Вы можете использовать asyncio. (Документацию можно найти здесь). Он используется в качестве основы для нескольких асинхронных фреймворков Python, которые обеспечивают высокопроизводительные сетевые и веб-серверы, библиотеки подключения к базам данных, распределенные очереди задач и т. Д. Кроме того, он имеет API высокого и низкого уровня для решения любых проблем..

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Теперь эта функция будет запускаться параллельно при каждом вызове без перевода основной программы в состояние ожидания. Вы также можете использовать его для распараллеливания цикла for. При вызове цикла for, хотя цикл является последовательным, но каждая итерация выполняется параллельно основной программе, как только туда попадает интерпретатор.Например:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

Это дает следующий результат:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

Чтобы распараллелить простой цикл for, joblib приносит большую пользу необработанному использованию многопроцессорной обработки. Не только короткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов) или захват трассировки дочернего процесса, для лучшей отчетности об ошибках.

Отказ от ответственности: я оригинальный автор joblib.

Какой самый простой способ распараллелить этот код?

мне действительно нравится concurrent.futures для этого доступно в Python3 начиная с версии 3.2 - и через бэкпорт до 2.6 и 2.7 на PyPi.

Вы можете использовать потоки или процессы и использовать точно такой же интерфейс.

многопроцессорная обработка

Поместите это в файл - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

И вот вывод:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Многопоточность

Теперь поменяй ProcessPoolExecutor в ThreadPoolExecutorи снова запустите модуль:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Теперь вы сделали как многопоточность, так и многопроцессорность!

Обратите внимание на производительность и использование обоих вместе.

Выборка слишком мала, чтобы сравнить результаты.

Тем не менее, я подозреваю, что многопоточность будет быстрее, чем многопроцессорность в целом, особенно в Windows, поскольку Windows не поддерживает разветвление, поэтому каждому новому процессу требуется время для запуска. На Linux или Mac они, вероятно, будут ближе.

Вы можете вкладывать несколько потоков в несколько процессов, но рекомендуется не использовать несколько потоков для выделения нескольких процессов.

Есть несколько преимуществ использования Ray:

  • Вы можете распараллеливать на нескольких машинах в дополнение к нескольким ядрам (с одним и тем же кодом).
  • Эффективная обработка числовых данных через разделяемую память (и сериализация без копирования).
  • Высокая пропускная способность при распределенном планировании.
  • Отказоустойчивость.

В вашем случае вы можете запустить Ray и определить удаленную функцию

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

а затем вызвать его параллельно

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Чтобы запустить тот же пример в кластере, единственной строкой, которая могла бы измениться, был бы вызов ray.init(). Соответствующую документацию можно найти здесь.

Обратите внимание, что я помогаю развивать Рэя.

Я нашел joblib очень полезно со мной. Пожалуйста, смотрите следующий пример:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1: использовать все доступные ядра

Фьючерсы Dask; Я удивлен, что об этом еще никто не упомянул. . .

      from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

Спасибо @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

Почему вы не используете потоки и один мьютекс для защиты одного глобального списка?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

имейте в виду, вы будете так быстро, как ваша самая медленная нить

В параллельных упаковщики со стороны библиотеки tqdm это отличный способ распараллеливания уже запущенного кода. tqdm предоставляет обратную связь о текущем прогрессе и оставшемся времени с помощью интеллектуального индикатора выполнения, который я считаю очень полезным для длительных вычислений.

Циклы можно переписать так, чтобы они выполнялись как параллельные потоки, простым вызовом thread_map, или как одновременные несколько процессов через простой вызов process_map:

      from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

Допустим, у нас есть асинхронная функция

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Это должно быть запущено на большом массиве. Некоторые атрибуты передаются в программу, а некоторые используются из свойства элемента словаря в массиве.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

Это может быть полезно при реализации многопроцессорных и параллельных / распределенных вычислений в Python.

Учебник YouTube по использованию пакета techila

Techila - это промежуточное программное обеспечение для распределенных вычислений, которое напрямую интегрируется с Python с помощью пакета techila. Функция персика в пакете может быть полезна для распараллеливания структур цикла. (Следующий фрагмент кода с форумов сообщества Techila)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

Посмотри на это;

http://docs.python.org/library/queue.html

Это может быть неправильный способ сделать это, но я бы сделал что-то вроде;

Актуальный код;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Надеюсь, это поможет.

Очень простой пример параллельной обработки

from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()

def yourfunction():

    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter = parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
Другие вопросы по тегам