Как порождать параллельные дочерние процессы в многопроцессорной системе?

У меня есть скрипт Python, который я хочу использовать в качестве контроллера для другого скрипта Python. У меня есть сервер с 64 процессорами, поэтому я хочу создать до 64 дочерних процессов этого второго скрипта Python. Дочерний скрипт называется:

$ python create_graphs.py --name=NAME

где NAME - это что-то вроде XYZ, ABC, NYU и т. д.

В моем сценарии родительского контроллера я извлекаю переменную name из списка:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Итак, мой вопрос, каков наилучший способ порождать эти процессы в детстве? Я хочу ограничить количество детей до 64 одновременно, поэтому необходимо отслеживать состояние (если дочерний процесс завершен или нет), чтобы я мог эффективно поддерживать работу всего поколения.

Я изучил использование пакета подпроцесса, но отклонил его, потому что он порождает только одного ребенка за раз. Я наконец нашел многопроцессорный пакет, но я признаюсь, что перегружен всей документацией о потоках и подпроцессах.

Прямо сейчас мой сценарий использует subprocess.call порождает только одного ребенка за раз и выглядит так:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Я действительно хочу, чтобы это породило 64 детей одновременно. В других вопросах о стековом потоке я видел людей, использующих очередь, но кажется, что это приводит к снижению производительности?

4 ответа

Решение

Что вам нужно, так это класс пула процессов в многопроцессорной среде.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

А вот пример расчета, чтобы его было легче понять. Следующее разделит 10000 задач на N процессов, где N - это число процессоров. Обратите внимание, что я передаю None как число процессов. Это заставит класс Pool использовать cpu_count для количества процессов ( ссылка)

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

Вот решение, которое я придумал, основываясь на комментариях Нади и Джима. Я не уверен, что это лучший способ, но он работает. Вызываемый исходный дочерний сценарий должен быть сценарием оболочки, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab. Поэтому мне пришлось взять его из Python и написать код в bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Это кажется разумным решением? Я пытался использовать формат цикла while Джима, но мой сценарий просто ничего не возвращал. Я не уверен, почему это будет. Вот вывод, когда я запускаю скрипт с циклом Jim'while', заменяющим цикл 'for':

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Когда я запускаю его с помощью цикла for, я получаю нечто более значимое:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Так что это работает, и я счастлив. Тем не менее, я до сих пор не понимаю, почему я не могу использовать цикл стиля "пока" Джима вместо цикла "для", который я использую. Спасибо за помощь - я впечатлен широтой знаний @ stackru.

Я не думаю, что вам нужна очередь, если вы не собираетесь извлекать данные из приложений (что, если вам нужны данные, я думаю, что в любом случае будет проще добавить их в базу данных)

но попробуйте это для размера:

поместите все содержимое скрипта create_graphs.py в функцию с именем create_graphs

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Я знаю, что это приведет к тому, что количество потоков будет на 1 меньше, чем у процессоров, что, вероятно, хорошо, и оставляет процессору управление потоками, дисковым вводом-выводом и другими событиями, происходящими на компьютере. Если вы решили, что хотите использовать последнее ядро, просто добавьте его к нему.

редактировать: я думаю, что, возможно, неправильно истолковал цель my_list. Вы не должны my_list отслеживать потоки вообще (поскольку на них все ссылаются элементы в threads список). Но это прекрасный способ подачи входных данных процессов - или даже лучше: используйте функцию генератора;)

Цель my_list а также threads

my_list содержит данные, которые вам нужно обработать в вашей функции
threads это просто список текущих запущенных потоков

цикл while делает две вещи, запускает новые потоки для обработки данных и проверяет, запущены ли какие-либо потоки.

Так что, если у вас есть (а) больше данных для обработки или (б) потоки, которые еще не завершены... вы хотите, чтобы программа продолжала работать. Как только оба списка будут пусты, они оценят False и цикл while выйдет

Я бы определенно использовал многопроцессорность вместо того, чтобы использовать собственное решение с использованием подпроцесса.

Другие вопросы по тегам