Python многопроцессорный pool.map для нескольких аргументов

В многопроцессорной библиотеке Python есть вариант pool.map, который поддерживает несколько аргументов?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()

25 ответов

Решение

Ответ на это зависит от версии и ситуации. Наиболее общий ответ для последних версий Python (начиная с версии 3.3) был впервые описан JF Sebastian.1 Он использует Pool.starmapметод, который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их данной функции:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Для более ранних версий Python вам нужно написать вспомогательную функцию для явной распаковки аргументов. Если вы хотите использоватьwithВам также нужно написать обертку, чтобы включитьPoolв контекстный менеджер. (Спасибо Мюону за указание на это.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

В более простых случаях с фиксированным вторым аргументом вы также можете использоватьpartial, но только в Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Во многом это было вдохновлено его ответом, который, вероятно, следовало бы принять вместо этого.Но так как эта книга застряла на вершине, лучше всего ее улучшить для будущих читателей.

Есть ли вариант pool.map, который поддерживает несколько аргументов?

Python 3.3 включает в себя pool.starmap() метод:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Для более старых версий:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Выход

1 1
2 1
3 1

Обратите внимание, как itertools.izip() а также itertools.repeat() используются здесь.

Из- за ошибки, упомянутой @unutbu, вы не можете использовать functools.partial() или аналогичные возможности в Python 2.6, так что простая функция-обертка func_star() должны быть определены явно. Смотрите также обходной путь, предложенный uptimebox,

Думаю ниже будет лучше

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

выход

[3, 5, 7]

Использование Python 3.3+ с pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Результат:

1 --- 4
2 --- 5
3 --- 6

Вы также можете zip() больше аргументов, если вам нравится: zip(a,b,c,d,e)

Если вы хотите, чтобы в качестве аргумента передавалось постоянное значение, вы должны использовать import itertools а потом zip(itertools.repeat(constant), a) например.

# "Как принять несколько аргументов".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

Узнав об itertools в ответе JF Sebastian, я решил сделать еще один шаг и написать parmap пакет, который заботится о распараллеливании, предлагая map а также starmap функции на python-2.7 и python-3.2 (а также позже), которые могут принимать любое количество позиционных аргументов.

Монтаж

pip install parmap

Как распараллелить:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Я загрузил parmap в PyPI и в репозиторий github.

В качестве примера на вопрос можно ответить следующим образом:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

Там вилка multiprocessing называется пафос (примечание: используйте версию на github), который не нужен starmap - функции карты отражают API для карты Python, поэтому карта может принимать несколько аргументов. С pathosВы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застрять в __main__ блок. После небольшого обновления ожидается выпуск Pathos - в основном, переход на Python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

Для меня ниже было короткое и простое решение:

      from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)

Выход:

      a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']

Лучше использовать декоратор вместо написания функции-оболочки вручную. Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая написания оболочки для каждой функции. Обычно декорированная функция не является кражей, однако мы можем использовать functools обойти это. Больше рассуждений можно найти здесь.

Вот пример

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Тогда вы можете сопоставить его с заархивированными аргументами

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Конечно, вы всегда можете использовать Pool.starmap в Python 3 (>=3.3), как указано в других ответах.

Лучшее решение для python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

из[]:

[3, 5, 7]

Другой способ - передать список списков подпрограмме с одним аргументом:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Можно создать список списков аргументов с помощью любимого метода.

Вы можете использовать следующие две функции, чтобы избежать написания оболочки для каждой новой функции:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Используйте функцию function со списками аргументов arg_0, arg_1 а также arg_2 следующее:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

Другая простая альтернатива - заключить параметры вашей функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежи. Это, возможно, не идеально, когда имеешь дело с большими кусками данных. Я считаю, что это будет делать копии для каждого кортежа.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Дает вывод в некотором случайном порядке:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

Вот еще один способ сделать это, IMHO более простой и элегантный, чем любой из других предоставленных ответов.

В этой программе есть функция, которая принимает два параметра, распечатывает их, а также выводит сумму:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

вывод:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

См. Документацию Python для получения дополнительной информации:

https://docs.python.org/3/library/multiprocessing.html

В частности, обязательно ознакомьтесь с starmap функция.

Я использую Python 3.6, не уверен, что это будет работать со старыми версиями Python.

Я не уверен, почему в документации нет такого простого примера, как этот.

В python 3.4.4 вы можете использовать multiprocessing.get_context(), чтобы получить объект контекста для использования нескольких методов запуска:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Или вы просто замените

pool.map(harvester(text,case),case, 1)

от:

pool.apply_async(harvester(text,case),case, 1)

В официальной документации говорится, что она поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

Здесь много ответов, но ни один из них не предоставляет Python 2/3-совместимый код, который будет работать на любой версии. Если вы хотите, чтобы ваш код просто работал, это будет работать для любой версии Python:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

После этого вы можете использовать многопроцессорную обработку обычным способом Python 3, как вам нравится. Например:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

будет работать в Python 2 или Python 3.

      import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

Это пример процедуры, которую я использую для передачи нескольких аргументов в функцию с одним аргументом, используемую в вилке pool.imap:

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()

Это может быть другой вариант. Хитрость в wrapper функция, которая возвращает другую функцию, которая передается в pool.map. Приведенный ниже код считывает входной массив и для каждого (уникального) элемента в нем возвращает, сколько раз (т.е. подсчитывает) этот элемент появляется в массиве, например, если вход

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

тогда ноль появляется 6 раз и единица 3 раза

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count


def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out

def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results

def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun

def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)

if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
           
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

У вас должно получиться:

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========

Если вы не состоите в браке с multiprocessing модуль, настоятельно рекомендую ознакомиться с :

      from concurrent import futures

def add(a, b):
    return a + b

with futures.ThreadPoolExecutor(max_workers=4) as e:
    for r in e.map(add, [1, 2, 3], [1]*3):
        print(r)

У него есть некоторые преимущества, о которых до сих пор не упоминалось:

  1. Вы можете легко получить доступ к прогрессу (например, для индикаторов выполнения, таких как tqdm)
  2. В отличие от многопроцессорности, concurrent.futures работает с ipython(обсуждение здесь).

Храните все свои аргументы в виде МАССЫ ТАБЛ.

Пример скажем, что обычно вы вызываете свою функцию как

      def mainImage(fragCoord : vec2, iResolution : vec3, iTime : float) -> vec3:

вместо этого передайте один кортеж и распакуйте аргументы

      def mainImage(package_iter) -> vec3: 
    fragCoord=package_iter[0]  
    iResolution=package_iter[1]
    iTime=package_iter[2]

Создайте кортеж, используя цикл перед рукой

          package_iter = [] 
    iResolution = vec3(nx,ny,1)
    for j in range( (ny-1), -1, -1):
        for i in range( 0, nx, 1): 
            fragCoord : vec2 = vec2(i,j)
            time_elapsed_seconds = 10
            package_iter.append(  (fragCoord, iResolution, time_elapsed_seconds)  )

затем выполните все, используя карту, передав МАССИВ ШАБЛОНОВ

          array_rgb_values = []

    with concurrent.futures.ProcessPoolExecutor() as executor: 
        for  val in executor.map(mainImage, package_iter):          
            fragColor=val
            ir = clip( int(255* fragColor.r), 0, 255)
            ig = clip(int(255* fragColor.g), 0, 255)
            ib= clip(int(255* fragColor.b), 0, 255)

            array_rgb_values.append( (ir,ig,ib) )

Я знаю, что в Python есть * и ** для распаковки, но я их еще не пробовал. Также лучше использовать параллельные фьючерсы библиотеки более высокого уровня, чем библиотеку многопроцессорной обработки низкого уровня

Немного другой подход — этот пример предназначен для загрузки набора файлов.

      from multiprocessing import Pool

def download_file(batch):
    items_to_grab, var1, var2, etc. = batch
    ...

##batch yourself instead of using pool.map's chunk argument
batches = list(batch(items_to_grab, 200))
##now create tuples out of each chunk and add other variables you want to send along
batches =  [(x, var1, var2, etc.) for x in batches]
with Pool(5) as p:
     results = p.map(download_file, batches) 

Для python2 вы можете использовать этот трюк

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Другие вопросы по тегам