Python многопроцессорный pool.map для нескольких аргументов
В многопроцессорной библиотеке Python есть вариант pool.map, который поддерживает несколько аргументов?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
25 ответов
Ответ на это зависит от версии и ситуации. Наиболее общий ответ для последних версий Python (начиная с версии 3.3) был впервые описан JF Sebastian.1 Он использует Pool.starmap
метод, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их данной функции:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Для более ранних версий Python вам нужно написать вспомогательную функцию для явной распаковки аргументов. Если вы хотите использоватьwith
Вам также нужно написать обертку, чтобы включитьPool
в контекстный менеджер. (Спасибо Мюону за указание на это.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
В более простых случаях с фиксированным вторым аргументом вы также можете использоватьpartial
, но только в Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Во многом это было вдохновлено его ответом, который, вероятно, следовало бы принять вместо этого.Но так как эта книга застряла на вершине, лучше всего ее улучшить для будущих читателей.
Есть ли вариант pool.map, который поддерживает несколько аргументов?
Python 3.3 включает в себя pool.starmap()
метод:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Для более старых версий:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Выход
1 1
2 1
3 1
Обратите внимание, как itertools.izip()
а также itertools.repeat()
используются здесь.
Из- за ошибки, упомянутой @unutbu, вы не можете использовать functools.partial()
или аналогичные возможности в Python 2.6, так что простая функция-обертка func_star()
должны быть определены явно. Смотрите также обходной путь, предложенный uptimebox
,
Думаю ниже будет лучше
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
выход
[3, 5, 7]
Использование Python 3.3+ с pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
Результат:
1 --- 4
2 --- 5
3 --- 6
Вы также можете zip() больше аргументов, если вам нравится: zip(a,b,c,d,e)
Если вы хотите, чтобы в качестве аргумента передавалось постоянное значение, вы должны использовать import itertools
а потом zip(itertools.repeat(constant), a)
например.
# "Как принять несколько аргументов".
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Узнав об itertools в ответе JF Sebastian, я решил сделать еще один шаг и написать parmap
пакет, который заботится о распараллеливании, предлагая map
а также starmap
функции на python-2.7 и python-3.2 (а также позже), которые могут принимать любое количество позиционных аргументов.
Монтаж
pip install parmap
Как распараллелить:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
Я загрузил parmap в PyPI и в репозиторий github.
В качестве примера на вопрос можно ответить следующим образом:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Там вилка multiprocessing
называется пафос (примечание: используйте версию на github), который не нужен starmap
- функции карты отражают API для карты Python, поэтому карта может принимать несколько аргументов. С pathos
Вы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застрять в __main__
блок. После небольшого обновления ожидается выпуск Pathos - в основном, переход на Python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
Для меня ниже было короткое и простое решение:
from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint
def dosomething(var,s):
sleep(randint(1,5))
print(var)
return var + s
array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
resp_ = pool.map(partial(dosomething,s="2"), array)
print(resp_)
Выход:
a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']
Лучше использовать декоратор вместо написания функции-оболочки вручную. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания оболочки для каждой функции. Обычно декорированная функция не является кражей, однако мы можем использовать functools
обойти это. Больше рассуждений можно найти здесь.
Вот пример
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Тогда вы можете сопоставить его с заархивированными аргументами
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Конечно, вы всегда можете использовать Pool.starmap
в Python 3 (>=3.3), как указано в других ответах.
Лучшее решение для python2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
из[]:
[3, 5, 7]
Другой способ - передать список списков подпрограмме с одним аргументом:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Можно создать список списков аргументов с помощью любимого метода.
Вы можете использовать следующие две функции, чтобы избежать написания оболочки для каждой новой функции:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Используйте функцию function
со списками аргументов arg_0
, arg_1
а также arg_2
следующее:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Другая простая альтернатива - заключить параметры вашей функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежи. Это, возможно, не идеально, когда имеешь дело с большими кусками данных. Я считаю, что это будет делать копии для каждого кортежа.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
Дает вывод в некотором случайном порядке:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Вот еще один способ сделать это, IMHO более простой и элегантный, чем любой из других предоставленных ответов.
В этой программе есть функция, которая принимает два параметра, распечатывает их, а также выводит сумму:
import multiprocessing
def main():
with multiprocessing.Pool(10) as pool:
params = [ (2, 2), (3, 3), (4, 4) ]
pool.starmap(printSum, params)
# end with
# end function
def printSum(num1, num2):
mySum = num1 + num2
print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function
if __name__ == '__main__':
main()
вывод:
num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8
См. Документацию Python для получения дополнительной информации:
https://docs.python.org/3/library/multiprocessing.html
В частности, обязательно ознакомьтесь с starmap
функция.
Я использую Python 3.6, не уверен, что это будет работать со старыми версиями Python.
Я не уверен, почему в документации нет такого простого примера, как этот.
В python 3.4.4 вы можете использовать multiprocessing.get_context(), чтобы получить объект контекста для использования нескольких методов запуска:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
Или вы просто замените
pool.map(harvester(text,case),case, 1)
от:
pool.apply_async(harvester(text,case),case, 1)
В официальной документации говорится, что она поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Здесь много ответов, но ни один из них не предоставляет Python 2/3-совместимый код, который будет работать на любой версии. Если вы хотите, чтобы ваш код просто работал, это будет работать для любой версии Python:
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
После этого вы можете использовать многопроцессорную обработку обычным способом Python 3, как вам нравится. Например:
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
будет работать в Python 2 или Python 3.
import time
from multiprocessing import Pool
def f1(args):
vfirst, vsecond, vthird = args[0] , args[1] , args[2]
print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
pass
if __name__ == '__main__':
p = Pool()
result = p.map(f1, [['Dog','Cat','Mouse']])
p.close()
p.join()
print(result)
Это пример процедуры, которую я использую для передачи нескольких аргументов в функцию с одним аргументом, используемую в вилке pool.imap:
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
Это может быть другой вариант. Хитрость в
wrapper
функция, которая возвращает другую функцию, которая передается в
pool.map
. Приведенный ниже код считывает входной массив и для каждого (уникального) элемента в нем возвращает, сколько раз (т.е. подсчитывает) этот элемент появляется в массиве, например, если вход
np.eye(3) = [ [1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
тогда ноль появляется 6 раз и единица 3 раза
import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count
def extract_counts(label_array):
labels = np.unique(label_array)
out = extract_counts_helper([label_array], labels)
return out
def extract_counts_helper(args, labels):
n = max(1, cpu_count() - 1)
pool = ThreadPool(n)
results = {}
pool.map(wrapper(args, results), labels)
pool.close()
pool.join()
return results
def wrapper(argsin, results):
def inner_fun(label):
label_array = argsin[0]
counts = get_label_counts(label_array, label)
results[label] = counts
return inner_fun
def get_label_counts(label_array, label):
return sum(label_array.flatten() == label)
if __name__ == "__main__":
img = np.ones([2,2])
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.eye(3)
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
img = np.random.randint(5, size=(3, 3))
out = extract_counts(img)
print('input array: \n', img)
print('label counts: ', out)
print("========")
У вас должно получиться:
input array:
[[1. 1.]
[1. 1.]]
label counts: {1.0: 4}
========
input array:
[[1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
label counts: {0.0: 6, 1.0: 3}
========
input array:
[[4 4 0]
[2 4 3]
[2 3 1]]
label counts: {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========
Если вы не состоите в браке с
multiprocessing
модуль, настоятельно рекомендую ознакомиться с :
from concurrent import futures
def add(a, b):
return a + b
with futures.ThreadPoolExecutor(max_workers=4) as e:
for r in e.map(add, [1, 2, 3], [1]*3):
print(r)
У него есть некоторые преимущества, о которых до сих пор не упоминалось:
- Вы можете легко получить доступ к прогрессу (например, для индикаторов выполнения, таких как
tqdm
) - В отличие от многопроцессорности,
concurrent.futures
работает сipython
(обсуждение здесь).
Храните все свои аргументы в виде МАССЫ ТАБЛ.
Пример скажем, что обычно вы вызываете свою функцию как
def mainImage(fragCoord : vec2, iResolution : vec3, iTime : float) -> vec3:
вместо этого передайте один кортеж и распакуйте аргументы
def mainImage(package_iter) -> vec3:
fragCoord=package_iter[0]
iResolution=package_iter[1]
iTime=package_iter[2]
Создайте кортеж, используя цикл перед рукой
package_iter = []
iResolution = vec3(nx,ny,1)
for j in range( (ny-1), -1, -1):
for i in range( 0, nx, 1):
fragCoord : vec2 = vec2(i,j)
time_elapsed_seconds = 10
package_iter.append( (fragCoord, iResolution, time_elapsed_seconds) )
затем выполните все, используя карту, передав МАССИВ ШАБЛОНОВ
array_rgb_values = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for val in executor.map(mainImage, package_iter):
fragColor=val
ir = clip( int(255* fragColor.r), 0, 255)
ig = clip(int(255* fragColor.g), 0, 255)
ib= clip(int(255* fragColor.b), 0, 255)
array_rgb_values.append( (ir,ig,ib) )
Я знаю, что в Python есть * и ** для распаковки, но я их еще не пробовал. Также лучше использовать параллельные фьючерсы библиотеки более высокого уровня, чем библиотеку многопроцессорной обработки низкого уровня
Немного другой подход — этот пример предназначен для загрузки набора файлов.
from multiprocessing import Pool
def download_file(batch):
items_to_grab, var1, var2, etc. = batch
...
##batch yourself instead of using pool.map's chunk argument
batches = list(batch(items_to_grab, 200))
##now create tuples out of each chunk and add other variables you want to send along
batches = [(x, var1, var2, etc.) for x in batches]
with Pool(5) as p:
results = p.map(download_file, batches)
Для python2 вы можете использовать этот трюк
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))