Невозможно выбрать <type 'instancemethod'> при использовании многопроцессорной обработки Pool.map()
Я пытаюсь использовать multiprocessing
"s Pool.map()
Функция для разделения работы одновременно. Когда я использую следующий код, он работает нормально:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
Однако, когда я использую его в более объектно-ориентированном подходе, он не работает. Это сообщение об ошибке:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Это происходит, когда моя основная программа:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
и следующее мое someClass
учебный класс:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Кто-нибудь знает, в чем может быть проблема, или простой способ ее обойти?
14 ответов
Проблема состоит в том, что многопроцессорная обработка должна выполнять сортировку между процессами, а связанные методы не могут быть выбраны. Обходной путь (независимо от того, считаете ли вы это "простым" или нет;-) - это добавить инфраструктуру в вашу программу, чтобы такие методы можно было выбрать, зарегистрировав ее с помощью метода стандартной библиотеки copy_reg.
Например, вклад Стивена Бетарда в этот поток (ближе к концу потока) демонстрирует один вполне работоспособный подход, позволяющий метод травления / расщепления через copy_reg
,
Все эти решения безобразны, потому что многопроцессорная обработка и выборка нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.
Если вы используете вилку multiprocessing
называется pathos.multiprocesssing
, вы можете напрямую использовать классы и методы классов в многопроцессорных map
функции. Это потому что dill
используется вместо pickle
или же cPickle
, а также dill
может сериализовать почти все в Python.
pathos.multiprocessing
также обеспечивает асинхронную функцию отображения... и это может map
функции с несколькими аргументами (например, map(math.pow, [1,2,3], [4,5,6])
)
Смотрите: что могут делать мультипроцессор и укроп вместе?
и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
И просто, чтобы быть явным, вы можете сделать именно то, что вы хотели сделать в первую очередь, и вы можете сделать это от переводчика, если хотите.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
Получите код здесь: https://github.com/uqfoundation/pathos
Вы также можете определить __call__()
метод внутри вашего someClass()
, который вызывает someClass.go()
а затем передать экземпляр someClass()
в бассейн. Этот объект является маринованным, и он прекрасно работает (для меня)...
Некоторые ограничения для решения Стивена Бетарда:
Когда вы регистрируете свой метод класса как функцию, деструктор вашего класса неожиданно вызывается каждый раз, когда заканчивается обработка вашего метода. Так что если у вас есть 1 экземпляр вашего класса, который вызывает n раз его метод, члены могут исчезнуть между двумя запусками, и вы можете получить сообщение malloc: *** error for object 0x...: pointer being freed was not allocated
(например, открыть файл участника) или pure virtual method called,
terminate called without an active exception
(что означает, что срок жизни объекта-члена, который я использовал, был короче, чем я думал). Я получил это при работе с n больше, чем размер пула. Вот короткий пример:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
Выход:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
__call__
Метод не настолько эквивалентен, потому что [None,...] читается из результатов:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
Так что ни один из обоих методов не удовлетворяет...
Вы также можете определить __call__()
метод внутри вашего someClass()
, который вызывает someClass.go()
а затем передать экземпляр someClass()
в бассейн. Этот объект является маринованным, и он прекрасно работает (для меня)...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
Есть еще один ярлык, который вы можете использовать, хотя он может быть неэффективным в зависимости от того, что находится в вашем классе.
Как все говорили, проблема в том, что multiprocessing
код должен перебирать вещи, которые он отправляет подпроцессам, которые он запустил, а средство выбора не выполняет методы экземпляра.
Однако вместо отправки метода экземпляра вы можете отправить фактический экземпляр класса плюс имя вызываемой функции в обычную функцию, которая затем использует getattr
вызывать метод экземпляра, создавая связанный метод в Pool
подпроцесс. Это похоже на определение __call__
метод, за исключением того, что вы можете вызвать более одной функции-члена.
Кража кода @EricH. Из его ответа и его комментирование (я набрал его заново, поэтому все имена меняются и тому подобное, по какой-то причине это казалось проще, чем вырезать и вставить:-)) для иллюстрации всей магии:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
Выходные данные показывают, что действительно, конструктор вызывается один раз (в исходном pid), а деструктор вызывается 9 раз (один раз для каждой сделанной копии = 2 или 3 раза за пул-рабочий процесс, в зависимости от необходимости, плюс один раз в оригинале процесс). Это часто нормально, как в этом случае, так как сборщик по умолчанию делает копию всего экземпляра и (частично) тайно повторно заполняет его - в этом случае делает:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
Вот почему, хотя деструктор вызывается восемь раз в трех рабочих процессах, он каждый раз ведет отсчет от 1 до 0, но, конечно, вы все равно можете столкнуться с проблемами. При необходимости вы можете предоставить свой собственный __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
в этом случае, например.
Решение от parisjohn выше прекрасно работает со мной. Плюс код выглядит чистым и простым для понимания. В моем случае есть несколько функций для вызова с использованием Pool, поэтому я изменил код parisjohn чуть ниже. Я сделал вызов, чтобы иметь возможность вызывать несколько функций, и имена функций передаются в аргументе dict из go()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
В этом простом случае, где someClass.f
не наследуя какие-либо данные из класса и не прикрепляя что-либо к классу, возможное решение было бы выделить f
так что его можно мариновать:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
Я столкнулся с той же проблемой, но обнаружил, что существует кодировщик JSON, который можно использовать для перемещения этих объектов между процессами.
from pyVmomi.VmomiSupport import VmomiJSONEncoder
Используйте это, чтобы создать свой список:
jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)
Затем в отображаемой функции используйте это, чтобы восстановить объект:
pfVmomiObj = json.loads(jsonSerialized)
Потенциально тривиальным решением этого является переход на использование multiprocessing.dummy
, Это основанная на потоках реализация многопроцессорного интерфейса, которая, похоже, не имеет этой проблемы в Python 2.7. У меня нет большого опыта здесь, но это быстрое изменение импорта позволило мне вызвать apply_async для метода класса.
Несколько хороших ресурсов на multiprocessing.dummy
:
Почему бы не использовать отдельный func?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
Обновление: по состоянию на день написания, namedTuples можно выбирать (начиная с Python 2.7)
Проблема здесь в том, что дочерние процессы не могут импортировать класс объекта - в этом случае, класс P-, в случае многомодельного проекта класс P должен импортироваться везде, где используется дочерний процесс
быстрый обходной путь - сделать его импортируемым, воздействуя на глобальные переменные ()
globals()["P"] = P
pathos.multiprocessing
работал на меня.
Оно имеет
pool
метод и сериализует все в отличие от
multiprocessing
import pathos.multiprocessing as mp
pool = mp.Pool(processes=2)
Нет необходимости даже устанавливать полный пафосный пакет.
На самом деле нужен только укроп (pip install dill
), а затем переопределите многопроцессорный Pickler с помощью укропного:
dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump
Этот ответ был заимствован из /questions/23327379/chto-mogut-delat-multiprotsessor-i-ukrop-vmeste/59165077#59165077