Как я могу восстановить возвращаемое значение функции, переданной multiprocessing.Process?
В приведенном ниже примере кода я хотел бы восстановить возвращаемое значение функции worker
, Как я могу сделать это? Где хранится это значение?
Пример кода:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Выход:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Я не могу найти соответствующий атрибут в объектах, хранящихся в jobs
,
Заранее спасибо, blz
13 ответов
Используйте общую переменную для общения. Например, вот так:
import multiprocessing
def worker(procnum, return_dict):
'''worker function'''
print str(procnum) + ' represent!'
return_dict[procnum] = procnum
if __name__ == '__main__':
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print return_dict.values()
Я думаю, что подход, предложенный @sega_sai, является лучшим. Но это действительно нуждается в примере кода, так что здесь идет:
import multiprocessing
from os import getpid
def worker(procnum):
print 'I am number %d in process %d' % (procnum, getpid())
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print pool.map(worker, range(5))
Который будет печатать возвращаемые значения:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Если вы знакомы с map
(встроенный в Python 2) это не должно быть слишком сложным. В противном случае посмотрите на ссылку sega_Sai.
Обратите внимание, как мало кода требуется. (Также обратите внимание, как процессы используются повторно).
Для тех, кто ищет, как получить ценность от Process
с помощью Queue
:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
print queue.get() # Prints {"foo": True}
p.join()
По какой-то причине я не смог найти общий пример того, как это сделать с Queue
где угодно (даже примеры документов Python не порождают несколько процессов), так что вот что я получил после 10 попыток:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
это блокирующая потокобезопасная очередь, которую вы можете использовать для хранения возвращаемых значений от дочерних процессов. Таким образом, вы должны передать очередь каждому процессу. Что-то менее очевидное в том, что вы должны get()
из очереди перед вами join
Process
или очередь заполняется и блокирует все.
Обновление для тех, кто является объектно-ориентированным (протестировано в Python 3.4):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
В этом примере показано, как использовать список экземпляров multiprocessing.Pipe для возврата строк из произвольного числа процессов:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
Выход:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
Это решение использует меньше ресурсов, чем многопроцессорное. Вопрос, который использует
- труба
- хотя бы один замок
- буфер
- Тема
или мультипроцессинг. Простой вопрос, который использует
- труба
- хотя бы один замок
Очень поучительно посмотреть на источник для каждого из этих типов.
Кажется, что вы должны использовать вместо этого класс multiprocessing.Pool и использовать методы.apply ().apply_async (), map ()
http://docs.python.org/library/multiprocessing.html?highlight=pool
Вы можете использовать exit
встроенный, чтобы установить код выхода процесса. Его можно получить из exitcode
атрибут процесса:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Выход:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Пакет Pebble имеет приятное использование абстракции multiprocessing.Pipe
что делает это довольно просто:
from pebble import concurrent
@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg
future = function(1, kwarg=1)
print(future.result())
Пример из: https://pythonhosted.org/Pebble/
Думал, что упросту простейшие примеры, скопированные сверху, работая для меня на Py3.6. Самый простой multiprocessing.Pool
:
import multiprocessing
import time
def worker(x):
time.sleep(1)
return x
pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
Вы можете установить количество процессов в пуле, например, Pool(processes=5)
. Однако по умолчанию используется счетчик ЦП, поэтому оставьте поле пустым для задач, связанных с ЦП. (Задачи, связанные с вводом-выводом, в любом случае часто подходят потокам, поскольку потоки в основном ждут, поэтому могут совместно использовать ядро ЦП.)Pool
также применяет оптимизацию фрагментов.
(Обратите внимание, что рабочий метод не может быть вложен в метод. Сначала я определил свой рабочий метод внутри метода, который вызывает вызов pool.map
, чтобы все это оставалось самодостаточным, но тогда процессы не могли его импортировать и выдавали "AttributeError: Can't pickle local object outer_method..inner_method". Подробнее здесь. Это может быть внутри класса.)
(Оцените исходный вопрос, указанный в печати 'represent!'
скорее, чем time.sleep()
, но без него я думал, что какой-то код работает одновременно, хотя это не так.)
Py3's ProcessPoolExecutor
также две строки (.map
возвращает генератор, поэтому вам нужен list()
):
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))
С простым Process
es:
import multiprocessing
import time
def worker(x, queue):
time.sleep(1)
queue.put(x)
queue = multiprocessing.SimpleQueue()
tasks = range(10)
for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()
for _ in tasks:
print(queue.get())
Использовать SimpleQueue
если все, что тебе нужно, это put
а также get
. Первый цикл запускает все процессы, прежде чем второй выполнит блокировку.queue.get
звонки. Не думаю, что есть причина звонитьp.join()
тоже.
Если вы используете Python 3, вы можете использовать concurrent.futures.ProcessPoolExecutor
в качестве удобной абстракции:
from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
Выход:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Вы можете использовать ProcessPoolExecutor для получения возвращаемого значения из функции, как показано ниже:
from concurrent.futures import ProcessPoolExecutor
def test(num1, num2):
return num1 + num2
with ProcessPoolExecutor() as executor:
feature = executor.submit(test, 2, 3)
print(feature.result()) # 5
Простое решение:
import multiprocessing
output=[]
data = range(0,10)
def f(x):
return x**2
def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r
if __name__ == '__main__':
output.append(handler())
print(output[0])
Выход:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Я немного изменил ответ vartec, так как мне нужно было получить коды ошибок из функции. (Спасибо Vertec!!! это удивительный трюк)
Это также можно сделать с помощью manager.list
но я думаю, что лучше иметь это в диктовке и хранить список в нем. Таким образом, мы сохраняем функцию и результаты, поскольку не можем быть уверены в том, в каком порядке будет заполняться список.
from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1" # no need for return since Multiprocess doesnt return it =(
def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"
def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# here you can check what did fail
for i in proc:
print i.name, i.exitcode # name was set up in the Process line 53
# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # things you can do to the function
print i, j