Совместное использование сложного объекта между процессами Python?

У меня есть довольно сложный объект Python, который мне нужно разделить между несколькими процессами. Я запускаю эти процессы, используя multiprocessing.Process, Когда я делюсь объектом с multiprocessing.Queue а также multiprocessing.Pipe в ней они делятся просто отлично. Но когда я пытаюсь поделиться объектом с другими объектами, не относящимися к мультипроцессорному модулю, кажется, что Python разветвляет эти объекты. Это правда?

Я пытался использовать многопроцессорность. Значение. Но я не уверен, какой тип должен быть? Мой объектный класс называется MyClass. Но когда я пытаюсь multiprocess.Value(MyClass, instance), это терпит неудачу с:

TypeError: this type has no size

Есть идеи, что происходит?

5 ответов

Решение

Вы можете сделать это, используя многопроцессорные классы Python "Manager" и прокси-класс, который вы определяете. Из документов Python: http://docs.python.org/library/multiprocessing.html

Что вы хотите сделать, это определить прокси-класс для своего пользовательского объекта, а затем поделиться объектом с помощью "Удаленного менеджера" - посмотрите на примеры на той же странице связанных ссылок для "Удаленного менеджера", где документы показывают, как делиться удаленная очередь. Вы будете делать то же самое, но ваш вызов your_manager_instance.register() включит ваш собственный прокси-класс в список аргументов.

Таким образом, вы настраиваете сервер для совместного использования пользовательского объекта с пользовательским прокси. Вашим клиентам необходим доступ к серверу (опять же, посмотрите отличные примеры документации о том, как настроить клиент / серверный доступ к удаленной очереди, но вместо того, чтобы делиться очередью, вы делитесь доступом к вашему определенному классу).

После долгих исследований и испытаний я обнаружил, что "Менеджер" выполняет эту работу на уровне не сложного объекта.

Код ниже показывает, что объект inst распределяется между процессами, что означает свойство var из inst изменяется снаружи, когда его изменяет дочерний процесс.

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

Хорошо, приведенного выше кода достаточно, если вам нужно только поделиться простыми объектами.

Почему нет комплекса? Потому что он может потерпеть неудачу, если ваш объект вложен (объект внутри объекта):

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

Я думаю, что основная причина такого поведения заключается в том, что Manager это просто моноблок, основанный на низкоуровневых инструментах связи, таких как труба / очередь.

Таким образом, этот подход не рекомендуется для многопроцессорных систем. Всегда лучше, если вы можете использовать низкоуровневые инструменты, такие как блокировка / семафор / труба / очередь, или высокоуровневые инструменты, такие как очередь Redis или публикация / подписка Redis для сложных случаев использования (только моя рекомендация, lol).

В Python 3.6 документы говорят:

Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий объект-контейнер, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться с помощью SyncManager.

Пока экземпляры создаются через SyncManager, вы должны иметь возможность ссылаться на объекты друг на друга. Однако динамическое создание одного типа объекта в методах другого типа объекта может оказаться невозможным или очень сложным.

Изменить: я наткнулся на эту проблему Менеджеры многопроцессорной обработки и настраиваемые классы с python 3.6.5 и 3.6.7. Нужно проверить Python 3.7

Изменить 2: из-за некоторых других проблем я в настоящее время не могу проверить это с помощью python3.7. Обходной путь, представленный в /questions/28622622/mnogoprotsessornyie-menedzheryi-i-polzovatelskie-klassyi/28622635#28622635, отлично подходит для меня

Вот пакет Python, который я сделал только для этого (разделение сложных объектов между процессами).

Git: https://github.com/dRoje/pipe-proxy

Идея в том, что вы создаете прокси для своего объекта и передаете его процессу. Затем вы используете прокси, как у вас есть ссылка на исходный объект. Хотя вы можете использовать только вызовы методов, доступ к переменным объекта осуществляется через сеттеры и геттеры.

Скажем, у нас есть объект под названием "пример", создать прокси и прослушиватель прокси очень просто:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

Теперь вы отправляете прокси другому процессу.

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

Используйте его в другом процессе, как если бы вы использовали исходный объект (пример):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

Но вы должны слушать это в основном процессе:

exampleProxyListener.listen()

Узнайте больше и найдите примеры здесь:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

Я попытался использовать BaseManager и зарегистрировать свой настроенный класс, чтобы сделать его счастливым, и получить проблему с вложенным классом, как Том уже упоминал выше.

Я думаю, что основная причина не имеет отношения к вложенному классу, как было сказано, но механизм коммуникации, который питон использует на низком уровне. Причина в том, что python использует некоторый механизм связи, подобный сокету, для синхронизации изменения настроенного класса в процессе сервера на низком уровне. Я думаю, что он инкапсулирует некоторые методы rpc, делает его просто прозрачным для пользователя, как если бы он вызывал локальные методы объекта вложенного класса.

Поэтому, когда вы хотите изменить, получить свои собственные или сторонние объекты, вы должны определить некоторые интерфейсы в своих процессах, чтобы взаимодействовать с ним, а не напрямую получать или устанавливать значения.

Тем не менее, работая с вложенными объектами во вложенных объектах, можно игнорировать проблемы, упомянутые выше, так же, как вы делаете это в своей обычной подпрограмме, потому что ваши вложенные объекты в зарегистрированном классе больше не являются прокси-объектами, над которыми операция больше не будет проходить процедуру обмена данными через сокеты и будет локализована.

Вот рабочий код, который я написал для решения проблемы.

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())

Чтобы сохранить некоторые головные боли с общими ресурсами, вы можете попытаться собрать данные, которым требуется доступ к одноэлементному ресурсу, в операторе возврата функции, которая отображается, например, pool.imap_unordered и затем далее обрабатывать его в цикле, который извлекает частичные результаты:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

Если будет возвращено не так много данных, то при этом может не потребоваться много времени.

Другие вопросы по тегам