Почему я могу передать метод экземпляра multiprocessing.Process, но не multiprocessing.Pool?
Я пытаюсь написать приложение, которое применяет функцию одновременно с multiprocessing.Pool
, Я хотел бы, чтобы эта функция была методом экземпляра (чтобы я мог определять его по-разному в разных подклассах). Это не представляется возможным; как я узнал в другом месте, очевидно, что связанные методы не могут быть засолены. Так почему начинается multiprocessing.Process
со связанным методом в качестве целевой работы? Следующий код:
import multiprocessing
def test1():
print "Hello, world 1"
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print answer
print
for answer in pool.imap(self.square, range(10)):
print answer
def test2(self):
print "Hello, world 2"
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Производит этот вывод:
Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
self.run()
File "C:\Python27\Lib\threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Почему процессы могут обрабатывать связанные методы, но не пулы?
3 ответа
pickle
модуль обычно не может выбрать методы экземпляра:
>>> import pickle
>>> class A(object):
... def z(self): print "hi"
...
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects
Тем не менее multiprocessing
модуль имеет кастом Pickler
это добавляет код для включения этой функции:
#
# Try making some callable types picklable
#
from pickle import Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
Вы можете повторить это с помощью copy_reg
Модуль, чтобы увидеть это работает на себя:
>>> import copy_reg
>>> def _reduce_method(m):
... if m.im_self is None:
... return getattr, (m.im_class, m.im_func.func_name)
... else:
... return getattr, (m.im_self, m.im_func.func_name)
...
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."
Когда вы используете Process.start
чтобы вызвать новый процесс в Windows, он выбирает все параметры, которые вы передали дочернему процессу, используя этот пользовательский ForkingPickler
:
#
# Windows
#
else:
# snip...
from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument.
#
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
# create pipe for communication with child
rfd, wfd = os.pipe()
# get handle for read end of the pipe and make it inheritable
...
# start process
...
# set attributes of self
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
Обратите внимание на раздел "отправить информацию ребенку". Это использует dump
функция, которая использует ForkingPickler
для выбора данных, что означает, что ваш метод экземпляра может быть выбран.
Теперь, когда вы используете методы на multiprocessing.Pool
чтобы отправить метод дочернему процессу, он использует multiprocessing.Pipe
травить данные. В Python 2.7 multiprocessing.Pipe
реализовано в C, и вызывает pickle_dumps
напрямую, поэтому он не использует преимущества ForkingPickler
, Это означает, что выбор метода экземпляра не работает.
Однако, если вы используете copy_reg
зарегистрировать instancemethod
тип, а не обычай Pickler
, все попытки травления будут затронуты. Таким образом, вы можете использовать это для включения методов выборки экземпляра, даже через Pool
:
import multiprocessing
import copy_reg
import types
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)
def test1():
print("Hello, world 1")
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print(answer)
print
for answer in pool.imap(self.square, range(10)):
print(answer)
def test2(self):
print("Hello, world 2")
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Выход:
Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
1GOT (0, 5, (True, 6))
GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10
GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
GOT (1, 6, (True, 36))
36
GOT (1, 7, (True, 49))
49
GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None
Также обратите внимание, что в Python 3.x pickle
может изначально выбирать типы методов экземпляра, так что ни один из этих вещей больше не имеет значения.:)
Вот альтернатива, которую я иногда использую, и она работает в Python2.x:
Вы можете создать своего рода "псевдоним" верхнего уровня для методов экземпляра, которые принимают объект, методы экземпляра которого вы хотите запустить в пуле, и заставляют его вызывать методы экземпляра для вас:
import functools
import multiprocessing
def _instance_method_alias(obj, arg):
"""
Alias for instance method that allows the method to be called in a
multiprocessing pool
"""
obj.instance_method(arg)
return
class MyClass(object):
"""
Our custom class whose instance methods we want to be able to use in a
multiprocessing pool
"""
def __init__(self):
self.my_string = "From MyClass: {}"
def instance_method(self, arg):
"""
Some arbitrary instance method
"""
print(self.my_string.format(arg))
return
# create an object of MyClass
obj = MyClass()
# use functools.partial to create a new method that always has the
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)
# create our list of things we will use the pool to map
l = [1,2,3]
# create the pool of workers
pool = multiprocessing.Pool()
# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)
# cleanup
pool.close()
pool.join()
Этот код производит этот вывод:
Из MyClass: 1
Из MyClass: 2
Из MyClass: 3
Одним из ограничений является то, что вы не можете использовать это для методов, которые изменяют объект. Каждый процесс получает копию объекта, для которого он вызывает методы, поэтому изменения не будут переданы обратно основному процессу. Если вам не нужно изменять объект из методов, которые вы вызываете, это может быть простым решением.
Вот простой способ работы в Python 2, просто оберните оригинальный метод экземпляра. Хорошо работает на MacOSX и Linux, не работает на Windows, протестирован Python 2.7
from multiprocessing import Pool
class Person(object):
def __init__(self):
self.name = 'Weizhong Tu'
def calc(self, x):
print self.name
return x ** 5
def func(x, p=Person()):
return p.calc(x)
pool = Pool()
print pool.map(func, range(10))