Как передать функцию с более чем одним аргументом в python concurrent.futures.ProcessPoolExecutor.map()?

Мне бы хотелось concurrent.futures.ProcessPoolExecutor.map() вызвать функцию, состоящую из 2 или более аргументов. В приведенном ниже примере я прибегнул к использованию lambda функция и определение ref как массив равного размера numberlist с одинаковым значением.

1-й вопрос: есть ли лучший способ сделать это? В случае, когда размер списка нумерации может составлять от миллиона до миллиарда элементов, следовательно, размер ссылки должен следовать за списком номеров, этот подход излишне занимает драгоценную память, чего я хотел бы избежать. Я сделал это, потому что я прочитал map функция прекратит свое отображение, пока не будет достигнут самый короткий конец массива.

import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3


def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')

Запустив код выше, я обнаружил, что map Функция смогла достичь желаемого результата. Тем не менее, когда я передал те же условия в concurrent.futures.ProcessPoolExecutor.map(), python3.5 не удалось с этой ошибкой:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed

Вопрос 2: Почему возникла эта ошибка и как я могу получить concurrent.futures.ProcessPoolExecutor.map () для вызова функции с более чем одним аргументом?

3 ответа

Решение

Чтобы ответить на ваш второй вопрос первым, вы получаете исключение, потому что lambda Функция, подобная той, которую вы используете, не является киркой. Поскольку Python использует pickle протокол для сериализации данных, передаваемых между основным процессом и ProcessPoolExecutorВ рабочих процессах это проблема. Не понятно, почему вы используете lambda совсем. Лямбда, которую вы имели, принимает два аргумента, как и оригинальная функция. Вы могли бы использовать _findmatch прямо вместо lambda и это должно работать.

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...

Что касается первого вопроса о передаче второго, постоянного аргумента без создания гигантского списка, вы можете решить это несколькими способами. Одним из подходов может быть использование itertools.repeat создать повторяемый объект, который повторяется одно и то же значение навсегда при повторении.

Но лучшим подходом, вероятно, было бы написать дополнительную функцию, которая передаст вам постоянный аргумент. (Возможно, именно поэтому вы пытались использовать lambda функция?) Должна работать, если используемая функция доступна в пространстве имен верхнего уровня модуля:

def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...

(1) Нет необходимости составлять список. Ты можешь использовать itertools.repeat создать итератор, который просто повторяет некоторое значение.

(2) Вам нужно передать именованную функцию map потому что он будет передан в подпроцесс для выполнения. map использует протокол рассола для отправки вещей, лямбда не может быть засолена, и поэтому они не могут быть частью карты. Но это совершенно не нужно. Все, что делала ваша лямбда, - это вызывала функцию с двумя параметрами и двумя параметрами. Удалите это полностью.

Рабочий код

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')

Что касается вашего первого вопроса, правильно ли я понимаю, что вы хотите передать аргумент, значение которого определяется только во время вызова map но постоянная для всех экземпляров сопоставленной функции? Если это так, я бы сделал map с функцией, полученной из "шаблонной функции" со вторым аргументом (ref в вашем примере) запекать с помощью functools.partial:

from functools import partial
refval = 5

def _findmatch(ref, listnumber):  # arguments swapped
    ...

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(partial(_findmatch, refval), numberlist):
        ...

Число рейнольдса Вопрос 2, первая часть: я не нашел точный фрагмент кода, который пытается выделить (сериализовать) функцию, которая должна затем выполняться параллельно, но звучит естественно, что это должно произойти - не только аргументы, но и функция должна быть как-то передана работникам, и, вероятно, она должна быть сериализована для этой передачи. Дело в том, что partial функции можно мариновать в то время как lambdas не может быть упомянуто в другом месте, например, здесь: /questions/31455277/python-pochemu-neobhodim-functoolspartial/31455287#31455287.

Число рейнольдса вопрос 2, вторая часть: если вы хотите вызвать функцию с более чем одним аргументом в ProcessPoolExecutor.mapвы бы передали ему функцию в качестве первого аргумента, за которой следовала бы итерация первых аргументов для функции, за которой следовали бы итерация ее вторых аргументов и т. д.

for n in executor.map(_findmatch, numberlist, ref):
    ...
Другие вопросы по тегам