Python многопроцессорные подпроцессы с упорядоченной печатью?

Я пытаюсь запустить некоторые функции Python параллельно, у которых есть команды печати по всей функции. Я хочу, чтобы каждый подпроцесс выполнял одну и ту же функцию для группового вывода на основной стандартный вывод. Под этим я подразумеваю, что я хочу, чтобы вывод каждого подпроцесса печатался только после того, как он завершил свою задачу. Однако, если во время этого процесса произошла какая-то ошибка, я хочу вывести все, что было сделано в подпроцессе.

Небольшой пример:

from time import sleep
import multiprocessing as mp


def foo(x):
    print('foo')
    for i in range(5):
        print('Process {}: in foo {}'.format(x, i))
        sleep(0.5)


if __name__ == '__main__':
    pool = mp.Pool()

    jobs = []
    for i in range(4):
        job = pool.apply_async(foo, args=[i])
        jobs.append(job)

    for job in jobs:
        job.wait()

Это работает параллельно, но выводится:

foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4

Что я хочу это:

foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4

Не имеет значения конкретный порядок любого процесса, если каждый вывод сгруппирован для каждого подпроцесса. Интересно, что я получаю желаемый результат, если я делаю

python test.py > output

Я знаю, что каждый подпроцесс не получает свой собственный стандартный вывод, вместо этого он использует основной стандартный вывод. Я подумал и посмотрел некоторые решения для этого, такие как создание так, чтобы мы использовали Очередь, и у каждого подпроцесса был свой собственный stdout, и затем, когда это будет сделано, мы переопределяем команду сброса, чтобы мы могли вывести вывод обратно в очередь. После этого мы можем прочитать содержимое. Однако, хотя это удовлетворяет тому, что я хочу, я не могу получить вывод, если функция остановилась на полпути. Он будет выводиться только после успешного завершения. Получил отсюда Доступ к стандартному выводу подпроцесса в python

Я также видел использование блокировок, которые работают, но полностью убивают параллельное выполнение функции, так как приходится ждать, пока каждый подпроцесс выполнит функцию, выполняющую функцию foo.

Также, если возможно, я бы хотел избежать изменения реализации моей функции foo, так как у меня есть много функций, которые мне нужно изменить.

РЕДАКТИРОВАТЬ: Я посмотрел в дисплеях библиотеки и параллельного Python. Dispy делает именно то, что я хочу, где у него есть отдельный stdout/stderr, который я могу просто распечатать в конце, но проблема с dispy заключается в том, что мне приходится вручную запускать сервер в отдельном терминале. Я хочу иметь возможность запускать мою программу на Python сразу, без необходимости открывать другой скрипт. Параллельный Python, с другой стороны, делает то, что я хочу, но, похоже, ему не хватает контроля над ним, а также некоторые раздражающие неудобства для него. В частности, когда вы распечатываете вывод, он также выводит тип возврата функции, я просто хочу вывод, который я распечатал с помощью print. Кроме того, когда вы запускаете функцию, вы должны предоставить ей список модулей, которые она использует, это немного раздражает, так как я не хочу иметь большой список импорта только для запуска простой функции.

1 ответ

Решение

Как вы заметили, использование блокировки в этом случае убило бы многопроцессорность, потому что вы, по сути, заставили бы все процессы ожидать освобождения мьютекса от процесса, который в настоящее время обладает "правами" на STDOUT. Однако параллельная работа и синхронная печать с вашей функцией / подпроцессом логически исключительны.

Вместо этого вы можете сделать так, чтобы ваш основной процесс служил "принтером" для ваших подпроцессов таким образом, чтобы как только ваш подпроцесс завершал / ошибался, тогда и только тогда он отправлял обратно вашему главному процессу, что печатать. Похоже, вы совершенно довольны тем, что печатаете не в режиме реального времени (как, впрочем, и раньше, как это уже упоминалось), поэтому такой подход должен служить вам как раз. Так:

import multiprocessing as mp
import random  # just to add some randomness
from time import sleep

def foo(x):
    output = ["[Process {}]: foo:".format(x)]
    for i in range(5):
        output.append('[Process {}] in foo {}'.format(x, i))
        sleep(0.2 + 1 * random.random())
    return "\n".join(output)

if __name__ == '__main__':
    pool = mp.Pool(4)
    for res in pool.imap_unordered(foo, range(4)):
        print("[MAIN]: Process finished, response:")
        print(res)  # this will print as soon as one of the processes finishes/errors
    pool.close()

Что даст вам (YMMV, конечно):

[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4

Вы можете наблюдать все остальное, в том числе ошибки, таким же образом.

ОБНОВЛЕНИЕ - Если вам абсолютно необходимо использовать функции, вывод которых вы не можете контролировать, вы можете вместо этого обернуть свои подпроцессы и захватить их STDOUT/STDERR, а затем, как только они будут выполнены (или вызвать исключение), вы можете вернуть все обратно менеджеру процесса. для печати на фактический STDOUT. С такой настройкой мы можем иметь foo() лайк:

def foo(x):
    print("[Process {}]: foo:".format(x))
    for i in range(5):
        print('[Process {}] in foo {}'.format(x, i))
        sleep(0.2 + 1 * random.random())
        if random.random() < 0.0625:  # let's add a 1/4 chance to err:
            raise Exception("[Process {}] A random exception is random!".format(x))
    return random.random() * 100  # just a random response, you can omit it

Обратите внимание, что он блаженно не подозревает о том, что пытается что-то связать с его режимом работы. Затем мы создадим внешнюю оболочку общего назначения (так что вам не нужно изменять ее в зависимости от функций), чтобы фактически связываться с ее поведением по умолчанию (и не только с этими функциями, но со всем остальным, что он мог бы вызвать во время работы):

def std_wrapper(args):
    try:
        from StringIO import StringIO  # ... for Python 2.x compatibility
    except ImportError:
        from io import StringIO
    import sys
    sys.stdout, sys.stderr = StringIO(), StringIO()  # replace stdout/err with our buffers
    # args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
    process_name = args[0]
    process_args = args[1] if len(args) > 1 else []
    process_kwargs = args[2] if len(args) > 2 else {}
    # get our method from its name, assuming global namespace of the current module/script
    process = globals()[process_name]
    response = None  # in case a call fails
    try:
        response = process(*process_args, **process_kwargs)  # call our process function
    except Exception as e:  # too broad but good enough as an example
        print(e)
    # rewind our buffers:
    sys.stdout.seek(0)
    sys.stderr.seek(0)
    # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
    return sys.stdout.read(), sys.stderr.read(), response

Теперь все, что нам нужно, это вызвать эту оболочку вместо желаемого foo()и предоставьте ему информацию о том, что звонить от нашего имени:

if __name__ == '__main__':
    pool = mp.Pool(4)
    # since we're wrapping the process we're calling, we need to send to the wrapper packed
    # data with instructions on what to call on our behalf.
    # info on args packing available in the std_wrapper function above.
    for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
        print(out.rstrip())  # remove the trailing space for niceness, print err if you want
    pool.close()

Так что теперь, если вы запустите его, вы получите что-то вроде этого:

[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!

Несмотря на foo() просто распечатка или ошибка. Конечно, вы можете использовать такую ​​оболочку для вызова любой функции и передачи ей любого количества аргументов /kwargs.

ОБНОВЛЕНИЕ № 2 - Но подождите! Если мы сможем обернуть наши функциональные процессы таким образом и получить их STDOUT/STDERR, мы, несомненно, можем превратить это в декоратор и использовать его в нашем коде с простым оформлением. Итак, для моего окончательного предложения:

import functools
import multiprocessing
import random  # just to add some randomness
import time

def std_wrapper(func):
    @functools.wraps(func)  # we need this to unravel the target function name
    def caller(*args, **kwargs):  # and now for the wrapper, nothing new here
        try:
            from StringIO import StringIO  # ... for Python 2.x compatibility
        except ImportError:
            from io import StringIO
        import sys
        sys.stdout, sys.stderr = StringIO(), StringIO()  # use our buffers instead
        response = None  # in case a call fails
        try:
            response = func(*args, **kwargs)  # call our wrapped process function
        except Exception as e:  # too broad but good enough as an example
            print(e)  # NOTE: the exception is also printed to the captured STDOUT
        # rewind our buffers:
        sys.stdout.seek(0)
        sys.stderr.seek(0)
        # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
        return sys.stdout.read(), sys.stderr.read(), response
    return caller

@std_wrapper  # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
    print("[Process {}]: foo:".format(x))
    for i in range(5):
        print('[Process {}] in foo {}'.format(x, i))
        time.sleep(0.2 + 1 * random.random())
        if random.random() < 0.0625:  # let's add a 1/4 chance to err:
            raise Exception("[Process {}] A random exception is random!".format(x))
    return random.random() * 100  # just a random response, you can omit it

И теперь мы можем вызывать наши упакованные функции, как и раньше, не имея дело с упаковкой аргументов или чем-то в этом роде, поэтому мы снова находимся:

if __name__ == '__main__':
    pool = multiprocessing.Pool(4)
    for out, err, res in pool.imap_unordered(foo, range(4)):
        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
        print(out.rstrip())  # remove the trailing space for niceness, print err if you want
    pool.close()

Вывод такой же, как в предыдущем примере, но в гораздо более приятном и управляемом пакете.

Другие вопросы по тегам