Вызвать многопроцессорность в методе класса Python
Изначально у меня есть класс для хранения некоторых обработанных значений и их повторного использования с другими методами.
Проблема заключается в том, что когда я пытался разделить метод класса на несколько процессов для ускорения, процессы, порожденные Python, но это, похоже, не сработало (как я видел в диспетчере задач, когда выполнялся только 1 процесс), и результат никогда не доставлялся.
Я сделал пару поисков и обнаружил, что pathos.multiprocessing может сделать это вместо этого, но мне интересно, может ли стандартная библиотека решить эту проблему?
from multiprocessing import Pool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return t
a = A(2)
a.run(list(range(10)))
2 ответа
Ваш код не работает, так как он не может pickle
метод экземпляра (self.cal
), что Python пытается сделать, когда вы порождаете несколько процессов, сопоставляя их multiprocessing.Pool
(ну, есть способ сделать это, но он слишком запутанный и в любом случае не очень полезен) - поскольку нет доступа к разделяемой памяти, он должен "упаковать" данные и отправить их порожденному процессу для распаковки. То же самое случилось бы с вами, если бы вы попытались засолить a
пример.
Единственный доступ к общей памяти, доступный в multiprocessing
пакет малоизвестен multiprocessing.pool.ThreadPool
так что если вы действительно хотите сделать это:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Но это не даст вам распараллеливания, поскольку оно по существу отображается на ваши обычные потоки, которые имеют доступ к общей памяти. Вместо этого вы должны передать классовые / статические методы (если они вам нужны) вместе с данными, с которыми вы хотите, чтобы они работали (в вашем случае self.vl
). Если вам нужно обмениваться этими данными между процессами, вам придется использовать некоторую абстракцию совместно используемой памяти, например multiprocessing.Value
, применяя мьютекс по пути конечно.
ОБНОВИТЬ
Я сказал, что вы можете сделать это (и есть модули, которые более или менее делают это, проверьте pathos.multiprocessing
например), но я не думаю, что это стоит того - когда вы приходите к тому моменту, когда вы должны заставить свою систему делать то, что вы хотите, есть вероятность, что вы используете не ту систему или вам следует переосмыслить свой дизайн. Но для информированности, вот один способ сделать то, что вы хотите в многопроцессорной среде:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Я думаю, что это довольно понятно, как это работает, но вкратце он передает имя вашего класса, его текущее состояние (без сигналов, tho), требуемый метод для вызова и аргументы для его вызова parallel_call
функция, которая вызывается для каждого процесса в Pool
, Python автоматически выбирает и обрабатывает все эти данные, поэтому все parallel_call
Необходимо восстановить исходный объект, найти в нем нужный метод и вызвать его с помощью предоставленных параметров.
Таким образом, мы передаем только данные, не пытаясь передать активные объекты, чтобы Python не жаловался (ну, в этом случае, попробуйте добавить ссылку на метод экземпляра в параметры вашего класса и посмотрите, что произойдет), и все работает просто отлично,
Если вы хотите серьезно использовать "магию", вы можете сделать так, чтобы она выглядела точно так же, как ваш код (создайте свой собственный Pool
обработчик, выбирайте имена из функций и отправляйте имена фактическим процессам и т. д.), но это должно служить достаточной функцией для вашего примера.
Однако, прежде чем вы возродите свои надежды, имейте в виду, что это будет работать только при совместном использовании "статического" экземпляра (экземпляра, который не меняет своего начального состояния, как только вы начинаете вызывать его в многопроцессорном контексте). Если A.cal
Метод заключается в изменении внутреннего состояния vl
свойство - это повлияет только на экземпляр, где оно изменяется (если только оно не изменяется в главном экземпляре, который вызывает Pool
между звонками). Если вы хотите поделиться государством, вы можете обновить parallel_call
поднять instance.__dict__
после вызова и возврата его вместе с результатом вызова метода, то на вызывающей стороне вам придется обновить локальный __dict__
с возвращенными данными, чтобы изменить исходное состояние. И этого недостаточно - вам действительно нужно создать общий dict и обработать весь персонал мьютекса, чтобы к нему одновременно обращались все процессы (вы можете использовать multiprocessing.Manager
для этого).
Так что, как я уже говорил, больше проблем, чем стоит...
Вопрос: кажется, что он не работает (как я видел в диспетчере задач, когда выполнялся только 1 процесс), и результат никогда не доставляется.
Вы видите только 1 процесс как Pool
рассчитать количество используемых процессов следующим образом:
Вы даете range(10)
= Индекс задачи 0,9, следовательно Pool
вычисление (10 / 4) * 4 = 8+1 = 9
,
После запуска первого process
Больше не осталось задачи.
использование range(32)
и вы увидите 4 process
Бег.
Вы возвращаетесь return t
вместо того, чтобы возвращать результат rs = pool.map(...
,
Это будет работать, например
def cal(self, nb):
import os
print('pid:{} cal({})'.format(os.getpid(), nb))
return nb * self.vl
def run(self,df):
with mp.Pool(processes=4) as pool:
rs = pool.map(self.cal, df)
pool.close()
return rs
if __name__ == '__main__':
a = A(2)
result = a.run(list(range(32)))
print(result)
Протестировано с Python: 3.4.2