Как порождать параллельные дочерние процессы в многопроцессорной системе?
У меня есть скрипт Python, который я хочу использовать в качестве контроллера для другого скрипта Python. У меня есть сервер с 64 процессорами, поэтому я хочу создать до 64 дочерних процессов этого второго скрипта Python. Дочерний скрипт называется:
$ python create_graphs.py --name=NAME
где NAME - это что-то вроде XYZ, ABC, NYU и т. д.
В моем сценарии родительского контроллера я извлекаю переменную name из списка:
my_list = [ 'XYZ', 'ABC', 'NYU' ]
Итак, мой вопрос, каков наилучший способ порождать эти процессы в детстве? Я хочу ограничить количество детей до 64 одновременно, поэтому необходимо отслеживать состояние (если дочерний процесс завершен или нет), чтобы я мог эффективно поддерживать работу всего поколения.
Я изучил использование пакета подпроцесса, но отклонил его, потому что он порождает только одного ребенка за раз. Я наконец нашел многопроцессорный пакет, но я признаюсь, что перегружен всей документацией о потоках и подпроцессах.
Прямо сейчас мой сценарий использует subprocess.call
порождает только одного ребенка за раз и выглядит так:
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
Я действительно хочу, чтобы это породило 64 детей одновременно. В других вопросах о стековом потоке я видел людей, использующих очередь, но кажется, что это приводит к снижению производительности?
4 ответа
Что вам нужно, так это класс пула процессов в многопроцессорной среде.
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
А вот пример расчета, чтобы его было легче понять. Следующее разделит 10000 задач на N процессов, где N - это число процессоров. Обратите внимание, что я передаю None как число процессов. Это заставит класс Pool использовать cpu_count для количества процессов ( ссылка)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
Вот решение, которое я придумал, основываясь на комментариях Нади и Джима. Я не уверен, что это лучший способ, но он работает. Вызываемый исходный дочерний сценарий должен быть сценарием оболочки, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab. Поэтому мне пришлось взять его из Python и написать код в bash.
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
Это кажется разумным решением? Я пытался использовать формат цикла while Джима, но мой сценарий просто ничего не возвращал. Я не уверен, почему это будет. Вот вывод, когда я запускаю скрипт с циклом Jim'while', заменяющим цикл 'for':
hostname{me}2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
Когда я запускаю его с помощью цикла for, я получаю нечто более значимое:
hostname{me}6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
Так что это работает, и я счастлив. Тем не менее, я до сих пор не понимаю, почему я не могу использовать цикл стиля "пока" Джима вместо цикла "для", который я использую. Спасибо за помощь - я впечатлен широтой знаний @ stackru.
Я не думаю, что вам нужна очередь, если вы не собираетесь извлекать данные из приложений (что, если вам нужны данные, я думаю, что в любом случае будет проще добавить их в базу данных)
но попробуйте это для размера:
поместите все содержимое скрипта create_graphs.py в функцию с именем create_graphs
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
Я знаю, что это приведет к тому, что количество потоков будет на 1 меньше, чем у процессоров, что, вероятно, хорошо, и оставляет процессору управление потоками, дисковым вводом-выводом и другими событиями, происходящими на компьютере. Если вы решили, что хотите использовать последнее ядро, просто добавьте его к нему.
редактировать: я думаю, что, возможно, неправильно истолковал цель my_list. Вы не должны my_list
отслеживать потоки вообще (поскольку на них все ссылаются элементы в threads
список). Но это прекрасный способ подачи входных данных процессов - или даже лучше: используйте функцию генератора;)
Цель my_list
а также threads
my_list
содержит данные, которые вам нужно обработать в вашей функции threads
это просто список текущих запущенных потоков
цикл while делает две вещи, запускает новые потоки для обработки данных и проверяет, запущены ли какие-либо потоки.
Так что, если у вас есть (а) больше данных для обработки или (б) потоки, которые еще не завершены... вы хотите, чтобы программа продолжала работать. Как только оба списка будут пусты, они оценят False
и цикл while выйдет
Я бы определенно использовал многопроцессорность вместо того, чтобы использовать собственное решение с использованием подпроцесса.