Python многопроцессорные подпроцессы с упорядоченной печатью?
Я пытаюсь запустить некоторые функции Python параллельно, у которых есть команды печати по всей функции. Я хочу, чтобы каждый подпроцесс выполнял одну и ту же функцию для группового вывода на основной стандартный вывод. Под этим я подразумеваю, что я хочу, чтобы вывод каждого подпроцесса печатался только после того, как он завершил свою задачу. Однако, если во время этого процесса произошла какая-то ошибка, я хочу вывести все, что было сделано в подпроцессе.
Небольшой пример:
from time import sleep
import multiprocessing as mp
def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)
if __name__ == '__main__':
pool = mp.Pool()
jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)
for job in jobs:
job.wait()
Это работает параллельно, но выводится:
foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4
Что я хочу это:
foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4
Не имеет значения конкретный порядок любого процесса, если каждый вывод сгруппирован для каждого подпроцесса. Интересно, что я получаю желаемый результат, если я делаю
python test.py > output
Я знаю, что каждый подпроцесс не получает свой собственный стандартный вывод, вместо этого он использует основной стандартный вывод. Я подумал и посмотрел некоторые решения для этого, такие как создание так, чтобы мы использовали Очередь, и у каждого подпроцесса был свой собственный stdout, и затем, когда это будет сделано, мы переопределяем команду сброса, чтобы мы могли вывести вывод обратно в очередь. После этого мы можем прочитать содержимое. Однако, хотя это удовлетворяет тому, что я хочу, я не могу получить вывод, если функция остановилась на полпути. Он будет выводиться только после успешного завершения. Получил отсюда Доступ к стандартному выводу подпроцесса в python
Я также видел использование блокировок, которые работают, но полностью убивают параллельное выполнение функции, так как приходится ждать, пока каждый подпроцесс выполнит функцию, выполняющую функцию foo.
Также, если возможно, я бы хотел избежать изменения реализации моей функции foo, так как у меня есть много функций, которые мне нужно изменить.
РЕДАКТИРОВАТЬ: Я посмотрел в дисплеях библиотеки и параллельного Python. Dispy делает именно то, что я хочу, где у него есть отдельный stdout/stderr, который я могу просто распечатать в конце, но проблема с dispy заключается в том, что мне приходится вручную запускать сервер в отдельном терминале. Я хочу иметь возможность запускать мою программу на Python сразу, без необходимости открывать другой скрипт. Параллельный Python, с другой стороны, делает то, что я хочу, но, похоже, ему не хватает контроля над ним, а также некоторые раздражающие неудобства для него. В частности, когда вы распечатываете вывод, он также выводит тип возврата функции, я просто хочу вывод, который я распечатал с помощью print. Кроме того, когда вы запускаете функцию, вы должны предоставить ей список модулей, которые она использует, это немного раздражает, так как я не хочу иметь большой список импорта только для запуска простой функции.
1 ответ
Как вы заметили, использование блокировки в этом случае убило бы многопроцессорность, потому что вы, по сути, заставили бы все процессы ожидать освобождения мьютекса от процесса, который в настоящее время обладает "правами" на STDOUT. Однако параллельная работа и синхронная печать с вашей функцией / подпроцессом логически исключительны.
Вместо этого вы можете сделать так, чтобы ваш основной процесс служил "принтером" для ваших подпроцессов таким образом, чтобы как только ваш подпроцесс завершал / ошибался, тогда и только тогда он отправлял обратно вашему главному процессу, что печатать. Похоже, вы совершенно довольны тем, что печатаете не в режиме реального времени (как, впрочем, и раньше, как это уже упоминалось), поэтому такой подход должен служить вам как раз. Так:
import multiprocessing as mp
import random # just to add some randomness
from time import sleep
def foo(x):
output = ["[Process {}]: foo:".format(x)]
for i in range(5):
output.append('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
return "\n".join(output)
if __name__ == '__main__':
pool = mp.Pool(4)
for res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response:")
print(res) # this will print as soon as one of the processes finishes/errors
pool.close()
Что даст вам (YMMV, конечно):
[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
Вы можете наблюдать все остальное, в том числе ошибки, таким же образом.
ОБНОВЛЕНИЕ - Если вам абсолютно необходимо использовать функции, вывод которых вы не можете контролировать, вы можете вместо этого обернуть свои подпроцессы и захватить их STDOUT/STDERR, а затем, как только они будут выполнены (или вызвать исключение), вы можете вернуть все обратно менеджеру процесса. для печати на фактический STDOUT. С такой настройкой мы можем иметь foo()
лайк:
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
Обратите внимание, что он блаженно не подозревает о том, что пытается что-то связать с его режимом работы. Затем мы создадим внешнюю оболочку общего назначения (так что вам не нужно изменять ее в зависимости от функций), чтобы фактически связываться с ее поведением по умолчанию (и не только с этими функциями, но со всем остальным, что он мог бы вызвать во время работы):
def std_wrapper(args):
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers
# args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
process_name = args[0]
process_args = args[1] if len(args) > 1 else []
process_kwargs = args[2] if len(args) > 2 else {}
# get our method from its name, assuming global namespace of the current module/script
process = globals()[process_name]
response = None # in case a call fails
try:
response = process(*process_args, **process_kwargs) # call our process function
except Exception as e: # too broad but good enough as an example
print(e)
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
Теперь все, что нам нужно, это вызвать эту оболочку вместо желаемого foo()
и предоставьте ему информацию о том, что звонить от нашего имени:
if __name__ == '__main__':
pool = mp.Pool(4)
# since we're wrapping the process we're calling, we need to send to the wrapper packed
# data with instructions on what to call on our behalf.
# info on args packing available in the std_wrapper function above.
for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
Так что теперь, если вы запустите его, вы получите что-то вроде этого:
[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!
Несмотря на foo()
просто распечатка или ошибка. Конечно, вы можете использовать такую оболочку для вызова любой функции и передачи ей любого количества аргументов /kwargs.
ОБНОВЛЕНИЕ № 2 - Но подождите! Если мы сможем обернуть наши функциональные процессы таким образом и получить их STDOUT/STDERR, мы, несомненно, можем превратить это в декоратор и использовать его в нашем коде с простым оформлением. Итак, для моего окончательного предложения:
import functools
import multiprocessing
import random # just to add some randomness
import time
def std_wrapper(func):
@functools.wraps(func) # we need this to unravel the target function name
def caller(*args, **kwargs): # and now for the wrapper, nothing new here
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead
response = None # in case a call fails
try:
response = func(*args, **kwargs) # call our wrapped process function
except Exception as e: # too broad but good enough as an example
print(e) # NOTE: the exception is also printed to the captured STDOUT
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
return caller
@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
time.sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
И теперь мы можем вызывать наши упакованные функции, как и раньше, не имея дело с упаковкой аргументов или чем-то в этом роде, поэтому мы снова находимся:
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for out, err, res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
Вывод такой же, как в предыдущем примере, но в гораздо более приятном и управляемом пакете.