Многопроцессорные менеджеры и пользовательские классы

Я не знаю почему, но я получаю эту странную ошибку всякий раз, когда пытаюсь перейти к методу общего объекта общего класса общего объекта. Версия Python: 3.6.3

Код:

from multiprocessing.managers import SyncManager

class MyManager(SyncManager): pass
class MyClass: pass

class Wrapper:
    def set(self, ent):
        self.ent = ent

MyManager.register('MyClass', MyClass)
MyManager.register('Wrapper', Wrapper)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()

    try:
        obj = manager.MyClass()
        lst = manager.list([1,2,3])

        collection = manager.Wrapper()
        collection.set(lst) # executed fine
        collection.set(obj) # raises error
    except Exception as e:
        raise

Ошибка:

---------------------------------------------------------------------------
Traceback (most recent call last):
  File "D:\Program Files\Python363\lib\multiprocessing\managers.py", line 228, in serve_client
    request = recv()
  File "D:\Program Files\Python363\lib\multiprocessing\connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "D:\Program Files\Python363\lib\multiprocessing\managers.py", line 881, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
---------------------------------------------------------------------------

В чем здесь проблема?

4 ответа

Решение

Нашел временное решение здесь. Мне удалось это исправить, добавив необходимое ключевое слово в инициализатор AutoProxy в multiprocessing\ Manager.py. Однако я не знаю, отвечает ли этот kwarg за что-либо.

Я тоже столкнулся с этим, как уже отмечалось, это ошибка в Python multiprocessing(см. проблему № 30256), и запрос на перенос, исправляющий это, еще не был объединен.

Помимо внесения исправлений в локальную установку вручную, у вас есть еще три варианта:

  • вы могли бы использовать MakeProxyType() вызываемый, чтобы указать ваш прокси-тип, не полагаясь на AutoProxy генератор прокси,
  • вы можете определить собственный прокси-класс,
  • вы можете исправить ошибку с помощью обезьяны

Я опишу эти варианты ниже, объяснив, что AutoProxy делает:

В чем смысл AutoProxy класс

Многопроцессорность ManagerШаблон предоставляет доступ к общим значениям, помещая все значения в один и тот же выделенный процесс "сервера канонических значений". Все другие процессы (клиенты) общаются с сервером через прокси-серверы, которые затем передают сообщения назад и вперед с сервером.

Однако серверу необходимо знать, какие методы приемлемы для данного типа объекта, чтобы клиенты могли создавать прокси-объект с помощью тех же методов. Это то, что AutoProxyобъект предназначен для. Когда клиенту нужен новый экземпляр вашего зарегистрированного класса, прокси по умолчанию, который создает клиент, является AutoProxy, который затем просит сервер сообщить ему, какие методы он может использовать.

Как только он получает имена методов, он вызывает MakeProxyType для создания нового класса, а затем создает экземпляр для возврата этого класса.

Все это откладывается до тех пор, пока вам действительно не понадобится экземпляр проксируемого типа, поэтому в принципе AutoProxyэкономит немного памяти, если вы не используете определенные классы, которые вы зарегистрировали. Однако это очень мало памяти, а обратная сторона - то, что этот процесс должен происходить в каждом клиентском процессе.

Эти прокси-объекты используют подсчет ссылок, чтобы отслеживать, когда сервер может удалить каноническое значение. Это та часть, которая сломана в AutoProxyвызываемый; новый аргумент передается типу прокси, чтобы отключить подсчет ссылок, когда объект прокси создается в процессе сервера, а не на клиенте, но AutoProxy Тип не был обновлен для поддержки этого.

Итак, как это исправить? Вот эти 3 варианта:

Использовать MakeProxyType() вызываемый

Как уже упоминалось, AutoProxy на самом деле просто вызов (через сервер) для получения общедоступных методов типа и вызов MakeProxyType(). Вы можете просто сделать эти звонки самостоятельно при регистрации.

Итак, вместо

       from multiprocessing.managers import SyncManager
SyncManager.register("YourType", YourType)

использовать

       from multiprocessing.managers import SyncManager, MakeProxyType, public_methods
#               arguments:    classname,  sequence of method names
YourTypeProxy = MakeProxyType("YourType", public_methods(YourType))
SyncManager.register("YourType", YourType, YourTypeProxy)

Не стесняйтесь вставлять MakeProxyType() звоните туда.

Если бы вы использовали exposed аргумент SyncManager.register(), вы должны передать эти имена в MakeProxyType вместо:

       # SyncManager.register("YourType", YourType, exposed=("foo", "bar"))
# becomes
YourTypeProxy = MakeProxyType("YourType", ("foo", "bar"))
SyncManager.register("YourType", YourType, YourTypeProxy)

Вам придется сделать это и для всех предварительно зарегистрированных типов:

       from multiprocessing.managers import SyncManager, AutoProxy, MakeProxyType, public_methods

registry = SyncManager._registry
for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
    if proxytype is not AutoProxy:
        continue
    create_method = hasattr(managers.SyncManager, typeid)
    if exposed is None:
        exposed = public_methods(callable) 
    SyncManager.register(
        typeid,
        callable=callable,
        exposed=exposed,
        method_to_typeid=method_to_typeid,
        proxytype=MakeProxyType(f"{typeid}Proxy", exposed),
        create_method=create_method,
    )

Создавайте собственные прокси

Вы не можете рассчитывать на то, что многопроцессорность создаст для вас прокси. Вы можете просто написать свой собственный. Прокси-сервер используется во всех процессах, за исключением специального серверного процесса "управляемые значения", и он должен передавать сообщения туда и обратно. Это, конечно, не вариант для уже зарегистрированных типов, но я упоминаю об этом здесь, потому что для ваших собственных типов это дает возможности для оптимизации.

Обратите внимание, что у вас должны быть методы для всех взаимодействий, которые должны вернуться к экземпляру "канонического" значения, поэтому вам нужно будет использовать свойства для обработки обычных атрибутов или добавить __getattr__, __setattr__ и __delattr__ методы по мере необходимости.

Преимущество состоит в том, что вы можете очень точно контролировать, какие методы действительно необходимы для обмена данными с серверным процессом; в моем конкретном примере мой прокси-класс кэширует неизменяемую информацию (значения никогда не изменятся после создания объекта), но часто использовались. Это включает значение флага, которое контролирует, будут ли что-то делать другие методы, поэтому прокси-сервер может просто проверить значение флага и не разговаривать с серверным процессом, если он не установлен. Что-то вроде этого:

       class FooProxy(BaseProxy):
    # what methods the proxy is allowed to access through calls
    _exposed_ = ("__getattribute__", "expensive_method", "spam")

    @property
    def flag(self):
        try:
            v = self._flag
        except AttributeError:
            # ask for the value from the server, "realvalue.flag"
            # use __getattribute__ because it's an attribute, not a property
            v = self._flag = self._callmethod("__getattribute__", ("flag",))
        return flag

    def expensive_method(self, *args, **kwargs):
        if self.flag:   # cached locally!
            return self._callmethod("expensive_method", args, kwargs)

    def spam(self, *args, **kwargs):
        return self._callmethod("spam", args, kwargs

SyncManager.register("Foo", Foo, FooProxy)

Потому как MakeProxyType() возвращает BaseProxy подкласс, вы можете объединить этот класс с настраиваемым подклассом, избавив себя от необходимости писать любые методы, которые просто состоят из return self._callmethod(...):

       # a base class with the methods generated for us. The second argument
# doubles as the 'permitted' names, stored as _exposed_
FooProxyBase = MakeProxyType(
    "FooProxyBase",
    ("__getattribute__", "expensive_method", "spam"),
)

class FooProxy(FooProxyBase):
    @property
    def flag(self):
        try:
            v = self._flag
        except AttributeError:
            # ask for the value from the server, "realvalue.flag"
            # use __getattribute__ because it's an attribute, not a property
            v = self._flag = self._callmethod("__getattribute__", ("flag",))
        return flag

    def expensive_method(self, *args, **kwargs):
        if self.flag:   # cached locally!
            return self._callmethod("expensive_method", args, kwargs)

    def spam(self, *args, **kwargs):
        return self._callmethod("spam", args, kwargs

SyncManager.register("Foo", Foo, FooProxy)

Опять же, это не решит проблему со стандартными типами, вложенными в другие прокси-значения.

Применить обезьяну

Я использую это, чтобы исправить AutoProxyвызываемый, это должно автоматически избегать исправления при запуске версии Python, в которой исправление уже было применено к исходному коду:

       # Backport of https://github.com/python/cpython/pull/4819
# Improvements to the Manager / proxied shared values code
# broke handling of proxied objects without a custom proxy type,
# as the AutoProxy function was not updated.
#
# This code adds a wrapper to AutoProxy if it is missing the
# new argument.

import logging
from inspect import signature
from functools import wraps
from multiprocessing import managers


logger = logging.getLogger(__name__)
orig_AutoProxy = managers.AutoProxy


@wraps(managers.AutoProxy)
def AutoProxy(*args, incref=True, manager_owned=False, **kwargs):
    # Create the autoproxy without the manager_owned flag, then
    # update the flag on the generated instance. If the manager_owned flag
    # is set, `incref` is disabled, so set it to False here for the same
    # result.
    autoproxy_incref = False if manager_owned else incref
    proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs)
    proxy._owned_by_manager = manager_owned
    return proxy


def apply():
    if "manager_owned" in signature(managers.AutoProxy).parameters:
        return

    logger.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned")
    managers.AutoProxy = AutoProxy

    # re-register any types already registered to SyncManager without a custom
    # proxy type, as otherwise these would all be using the old unpatched AutoProxy
    SyncManager = managers.SyncManager
    registry = managers.SyncManager._registry
    for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
        if proxytype is not orig_AutoProxy:
            continue
        create_method = hasattr(managers.SyncManager, typeid)
        SyncManager.register(
            typeid,
            callable=callable,
            exposed=exposed,
            method_to_typeid=method_to_typeid,
            create_method=create_method,
        )

Импортируйте вышеуказанное и вызовите apply() функция для исправления multiprocessing. Сделайте это перед запуском сервера-менеджера!

Решение для редактирования многопроцессорного исходного кода

Оригинальный ответ Sergey Dylda требует, чтобы вы отредактировали многопроцессорный исходный код следующим образом:

  1. Найдите ваш многопроцессорный пакет (мой, установленный через Anaconda, был в /anaconda3/lib/python3.6/multiprocessing).
  2. открыто managers.py
  3. Добавьте ключевой аргумент manager_owned=True к AutoProxy функция.

Оригинальный АвтоПрокси:

def AutoProxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True):
    ...

Отредактированный AutoProxy:

def AutoProxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True, manager_owned=True):
    ...

Решение с помощью кода, во время выполнения

Мне удалось разрешить неожиданное исключение аргумента TypeError ключевого слова без непосредственного редактирования исходного кода многопроцессорной обработки, добавив вместо этого несколько строк кода, в которых я использую менеджеры многопроцессорной обработки:

import multiprocessing

# Backup original AutoProxy function
backup_autoproxy = multiprocessing.managers.AutoProxy

# Defining a new AutoProxy that handles unwanted key argument 'manager_owned'
def redefined_autoproxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True, manager_owned=True):
    # Calling original AutoProxy without the unwanted key argument
    return backup_autoproxy(token, serializer, manager, authkey,
                     exposed, incref)

# Updating AutoProxy definition in multiprocessing.managers package
multiprocessing.managers.AutoProxy = redefined_autoproxy

Если кто-то столкнулся с такими ошибками: PickleError: Can't pickle <class 'multiprocessing.managers.xxxx'>: поиск атрибута xxxx в multiprocessing.managers не удался после реализации отличного решения от Martijn, вы можете попробовать этот патч:

В отличие от автоматически сгенерированногоAutoProxyэкземпляр, прокси-класс, созданныйMakeProxyTypeнет вmultiprocessing.managersпространство имен. Поэтому вам нужно добавить его в пространство именsetattr, так:

      import multiprocessing.managers as mms
from multiprocessing.managers import SyncManager, MakeProxyType, public_methods

TaskProxy = MakeProxyType('Task', public_methods(Task))
setattr(mms, 'Task', TaskProxy)
SyncManager.register('Task', Task, TaskProxy)

TaskProxy — это создаваемый вами прокси-класс. Вам нужно использовать setattr, чтобы добавить его в пространство имен multiprocessing.managers. Тогда это должно работать.

Другие вопросы по тегам