Вызвать многопроцессорность в методе класса Python

Изначально у меня есть класс для хранения некоторых обработанных значений и их повторного использования с другими методами.

Проблема заключается в том, что когда я пытался разделить метод класса на несколько процессов для ускорения, процессы, порожденные Python, но это, похоже, не сработало (как я видел в диспетчере задач, когда выполнялся только 1 процесс), и результат никогда не доставлялся.

Я сделал пару поисков и обнаружил, что pathos.multiprocessing может сделать это вместо этого, но мне интересно, может ли стандартная библиотека решить эту проблему?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))

2 ответа

Решение

Ваш код не работает, так как он не может pickle метод экземпляра (self.cal), что Python пытается сделать, когда вы порождаете несколько процессов, сопоставляя их multiprocessing.Pool (ну, есть способ сделать это, но он слишком запутанный и в любом случае не очень полезен) - поскольку нет доступа к разделяемой памяти, он должен "упаковать" данные и отправить их порожденному процессу для распаковки. То же самое случилось бы с вами, если бы вы попытались засолить a пример.

Единственный доступ к общей памяти, доступный в multiprocessing пакет малоизвестен multiprocessing.pool.ThreadPool так что если вы действительно хотите сделать это:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Но это не даст вам распараллеливания, поскольку оно по существу отображается на ваши обычные потоки, которые имеют доступ к общей памяти. Вместо этого вы должны передать классовые / статические методы (если они вам нужны) вместе с данными, с которыми вы хотите, чтобы они работали (в вашем случае self.vl). Если вам нужно обмениваться этими данными между процессами, вам придется использовать некоторую абстракцию совместно используемой памяти, например multiprocessing.Value, применяя мьютекс по пути конечно.

ОБНОВИТЬ

Я сказал, что вы можете сделать это (и есть модули, которые более или менее делают это, проверьте pathos.multiprocessing например), но я не думаю, что это стоит того - когда вы приходите к тому моменту, когда вы должны заставить свою систему делать то, что вы хотите, есть вероятность, что вы используете не ту систему или вам следует переосмыслить свой дизайн. Но для информированности, вот один способ сделать то, что вы хотите в многопроцессорной среде:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Я думаю, что это довольно понятно, как это работает, но вкратце он передает имя вашего класса, его текущее состояние (без сигналов, tho), требуемый метод для вызова и аргументы для его вызова parallel_call функция, которая вызывается для каждого процесса в Pool, Python автоматически выбирает и обрабатывает все эти данные, поэтому все parallel_call Необходимо восстановить исходный объект, найти в нем нужный метод и вызвать его с помощью предоставленных параметров.

Таким образом, мы передаем только данные, не пытаясь передать активные объекты, чтобы Python не жаловался (ну, в этом случае, попробуйте добавить ссылку на метод экземпляра в параметры вашего класса и посмотрите, что произойдет), и все работает просто отлично,

Если вы хотите серьезно использовать "магию", вы можете сделать так, чтобы она выглядела точно так же, как ваш код (создайте свой собственный Pool обработчик, выбирайте имена из функций и отправляйте имена фактическим процессам и т. д.), но это должно служить достаточной функцией для вашего примера.

Однако, прежде чем вы возродите свои надежды, имейте в виду, что это будет работать только при совместном использовании "статического" экземпляра (экземпляра, который не меняет своего начального состояния, как только вы начинаете вызывать его в многопроцессорном контексте). Если A.cal Метод заключается в изменении внутреннего состояния vl свойство - это повлияет только на экземпляр, где оно изменяется (если только оно не изменяется в главном экземпляре, который вызывает Pool между звонками). Если вы хотите поделиться государством, вы можете обновить parallel_call поднять instance.__dict__ после вызова и возврата его вместе с результатом вызова метода, то на вызывающей стороне вам придется обновить локальный __dict__ с возвращенными данными, чтобы изменить исходное состояние. И этого недостаточно - вам действительно нужно создать общий dict и обработать весь персонал мьютекса, чтобы к нему одновременно обращались все процессы (вы можете использовать multiprocessing.Manager для этого).

Так что, как я уже говорил, больше проблем, чем стоит...

Вопрос: кажется, что он не работает (как я видел в диспетчере задач, когда выполнялся только 1 процесс), и результат никогда не доставляется.

Вы видите только 1 процесс как Pool рассчитать количество используемых процессов следующим образом:
Вы даете range(10) = Индекс задачи 0,9, следовательно Pool вычисление (10 / 4) * 4 = 8+1 = 9,
После запуска первого process Больше не осталось задачи.
использование range(32) и вы увидите 4 process Бег.

Вы возвращаетесь return tвместо того, чтобы возвращать результат rs = pool.map(...,


Это будет работать, например

def cal(self, nb):
    import os
    print('pid:{} cal({})'.format(os.getpid(), nb))
    return nb * self.vl

def run(self,df):
    with mp.Pool(processes=4) as pool:
        rs = pool.map(self.cal, df)
    pool.close()
    return rs

if __name__ == '__main__':
    a = A(2)
    result = a.run(list(range(32)))
    print(result)

Протестировано с Python: 3.4.2

Другие вопросы по тегам