Как передать функцию с более чем одним аргументом в python concurrent.futures.ProcessPoolExecutor.map()?
Мне бы хотелось concurrent.futures.ProcessPoolExecutor.map()
вызвать функцию, состоящую из 2 или более аргументов. В приведенном ниже примере я прибегнул к использованию lambda
функция и определение ref
как массив равного размера numberlist
с одинаковым значением.
1-й вопрос: есть ли лучший способ сделать это? В случае, когда размер списка нумерации может составлять от миллиона до миллиарда элементов, следовательно, размер ссылки должен следовать за списком номеров, этот подход излишне занимает драгоценную память, чего я хотел бы избежать. Я сделал это, потому что я прочитал map
функция прекратит свое отображение, пока не будет достигнут самый короткий конец массива.
import concurrent.futures as cf
nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
print(n)
if str(ref[0]) in n:
print('match')
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
print(type(n))
print(n)
if str(ref[0]) in n:
print('match')
Запустив код выше, я обнаружил, что map
Функция смогла достичь желаемого результата. Тем не менее, когда я передал те же условия в concurrent.futures.ProcessPoolExecutor.map(), python3.5 не удалось с этой ошибкой:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
Вопрос 2: Почему возникла эта ошибка и как я могу получить concurrent.futures.ProcessPoolExecutor.map () для вызова функции с более чем одним аргументом?
3 ответа
Чтобы ответить на ваш второй вопрос первым, вы получаете исключение, потому что lambda
Функция, подобная той, которую вы используете, не является киркой. Поскольку Python использует pickle
протокол для сериализации данных, передаваемых между основным процессом и ProcessPoolExecutor
В рабочих процессах это проблема. Не понятно, почему вы используете lambda
совсем. Лямбда, которую вы имели, принимает два аргумента, как и оригинальная функция. Вы могли бы использовать _findmatch
прямо вместо lambda
и это должно работать.
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_findmatch, numberlist, ref):
...
Что касается первого вопроса о передаче второго, постоянного аргумента без создания гигантского списка, вы можете решить это несколькими способами. Одним из подходов может быть использование itertools.repeat
создать повторяемый объект, который повторяется одно и то же значение навсегда при повторении.
Но лучшим подходом, вероятно, было бы написать дополнительную функцию, которая передаст вам постоянный аргумент. (Возможно, именно поэтому вы пытались использовать lambda
функция?) Должна работать, если используемая функция доступна в пространстве имен верхнего уровня модуля:
def _helper(x):
return _findmatch(x, 5)
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_helper, numberlist):
...
(1) Нет необходимости составлять список. Ты можешь использовать itertools.repeat
создать итератор, который просто повторяет некоторое значение.
(2) Вам нужно передать именованную функцию map
потому что он будет передан в подпроцесс для выполнения. map
использует протокол рассола для отправки вещей, лямбда не может быть засолена, и поэтому они не могут быть частью карты. Но это совершенно не нужно. Все, что делала ваша лямбда, - это вызывала функцию с двумя параметрами и двумя параметрами. Удалите это полностью.
Рабочий код
import concurrent.futures as cf
import itertools
nmax = 10
numberlist = range(nmax)
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
print(type(n))
print(n)
#if str(ref[0]) in n:
# print('match')
Что касается вашего первого вопроса, правильно ли я понимаю, что вы хотите передать аргумент, значение которого определяется только во время вызова map
но постоянная для всех экземпляров сопоставленной функции? Если это так, я бы сделал map
с функцией, полученной из "шаблонной функции" со вторым аргументом (ref
в вашем примере) запекать с помощью functools.partial
:
from functools import partial
refval = 5
def _findmatch(ref, listnumber): # arguments swapped
...
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(partial(_findmatch, refval), numberlist):
...
Число рейнольдса Вопрос 2, первая часть: я не нашел точный фрагмент кода, который пытается выделить (сериализовать) функцию, которая должна затем выполняться параллельно, но звучит естественно, что это должно произойти - не только аргументы, но и функция должна быть как-то передана работникам, и, вероятно, она должна быть сериализована для этой передачи. Дело в том, что partial
функции можно мариновать в то время как lambda
s не может быть упомянуто в другом месте, например, здесь: /questions/31455277/python-pochemu-neobhodim-functoolspartial/31455287#31455287.
Число рейнольдса вопрос 2, вторая часть: если вы хотите вызвать функцию с более чем одним аргументом в ProcessPoolExecutor.map
вы бы передали ему функцию в качестве первого аргумента, за которой следовала бы итерация первых аргументов для функции, за которой следовали бы итерация ее вторых аргументов и т. д.
for n in executor.map(_findmatch, numberlist, ref):
...