Почему я могу передать метод экземпляра multiprocessing.Process, но не multiprocessing.Pool?

Я пытаюсь написать приложение, которое применяет функцию одновременно с multiprocessing.Pool, Я хотел бы, чтобы эта функция была методом экземпляра (чтобы я мог определять его по-разному в разных подклассах). Это не представляется возможным; как я узнал в другом месте, очевидно, что связанные методы не могут быть засолены. Так почему начинается multiprocessing.Process со связанным методом в качестве целевой работы? Следующий код:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Производит этот вывод:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Почему процессы могут обрабатывать связанные методы, но не пулы?

3 ответа

Решение

pickle модуль обычно не может выбрать методы экземпляра:

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

Тем не менее multiprocessing модуль имеет кастом Pickler это добавляет код для включения этой функции:

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

Вы можете повторить это с помощью copy_reg Модуль, чтобы увидеть это работает на себя:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."

Когда вы используете Process.start чтобы вызвать новый процесс в Windows, он выбирает все параметры, которые вы передали дочернему процессу, используя этот пользовательский ForkingPickler:

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

Обратите внимание на раздел "отправить информацию ребенку". Это использует dump функция, которая использует ForkingPickler для выбора данных, что означает, что ваш метод экземпляра может быть выбран.

Теперь, когда вы используете методы на multiprocessing.Pool чтобы отправить метод дочернему процессу, он использует multiprocessing.Pipe травить данные. В Python 2.7 multiprocessing.Pipe реализовано в C, и вызывает pickle_dumps напрямую, поэтому он не использует преимущества ForkingPickler, Это означает, что выбор метода экземпляра не работает.

Однако, если вы используете copy_reg зарегистрировать instancemethod тип, а не обычай Pickler, все попытки травления будут затронуты. Таким образом, вы можете использовать это для включения методов выборки экземпляра, даже через Pool:

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Выход:

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

Также обратите внимание, что в Python 3.x pickle может изначально выбирать типы методов экземпляра, так что ни один из этих вещей больше не имеет значения.:)

Вот альтернатива, которую я иногда использую, и она работает в Python2.x:

Вы можете создать своего рода "псевдоним" верхнего уровня для методов экземпляра, которые принимают объект, методы экземпляра которого вы хотите запустить в пуле, и заставляют его вызывать методы экземпляра для вас:

import functools
import multiprocessing

def _instance_method_alias(obj, arg):
    """
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool
    """
    obj.instance_method(arg)
    return

class MyClass(object):
    """
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool
    """

    def __init__(self):
        self.my_string = "From MyClass: {}"

    def instance_method(self, arg):
        """
        Some arbitrary instance method
        """

        print(self.my_string.format(arg))
        return

# create an object of MyClass
obj = MyClass()

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)

# create our list of things we will use the pool to map
l = [1,2,3]

# create the pool of workers
pool = multiprocessing.Pool()

# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)

# cleanup
pool.close()
pool.join()

Этот код производит этот вывод:

Из MyClass: 1
Из MyClass: 2
Из MyClass: 3

Одним из ограничений является то, что вы не можете использовать это для методов, которые изменяют объект. Каждый процесс получает копию объекта, для которого он вызывает методы, поэтому изменения не будут переданы обратно основному процессу. Если вам не нужно изменять объект из методов, которые вы вызываете, это может быть простым решением.

Вот простой способ работы в Python 2, просто оберните оригинальный метод экземпляра. Хорошо работает на MacOSX и Linux, не работает на Windows, протестирован Python 2.7

from multiprocessing import Pool

class Person(object):
    def __init__(self):
        self.name = 'Weizhong Tu'

    def calc(self, x):
        print self.name
        return x ** 5


def func(x, p=Person()):
    return p.calc(x)


pool = Pool()
print pool.map(func, range(10))
Другие вопросы по тегам