Использование многопроцессорного модуля Python для одновременного и отдельного запуска модели SEAWAT/MODFLOW
Я пытаюсь выполнить 100 запусков моделей на моей 8-процессорной 64-битной машине с Windows 7. Я хотел бы запустить 7 экземпляров модели одновременно, чтобы уменьшить общее время выполнения (около 9,5 мин на запуск модели). Я просмотрел несколько потоков, относящихся к многопроцессорному модулю Python, но все еще чего-то не хватает.
Использование многопроцессорного модуля
Как порождать параллельные дочерние процессы в многопроцессорной системе?
Python Многопроцессорная очередь
Мой процесс:
У меня есть 100 различных наборов параметров, которые я хотел бы запустить через SEAWAT/MODFLOW, чтобы сравнить результаты. Я предварительно собрал входные файлы моделей для каждого запуска модели и сохранил их в своих собственных каталогах. То, что я хотел бы иметь, - это запускать 7 моделей одновременно, пока все реализации не будут завершены. Не должно быть связи между процессами или отображением результатов. До сих пор я мог только порождать модели последовательно:
import os,subprocess
import multiprocessing as mp
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
if f.endswith('.npy'):
files.append(f)
## def work(cmd):
## return subprocess.call(cmd, shell=False)
def run(f,def_param=ws):
real = f.split('_')[2].split('.')[0]
print 'Realization %s' % real
mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
exe = seawatV4x64
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
os.system( exe + swt_nam )
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
tasks = range(len(files))
results = []
for f in files:
r = p.map_async(run(f), tasks, callback=results.append)
Я изменил if __name__ == 'main':
к следующему в надежде, что это исправит недостаток параллелизма for loop
, Тем не менее, модель не работает даже (без ошибки Python):
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
p.map_async(run,((files[f],) for f in range(len(files))))
Любая помощь очень ценится!
РЕДАКТИРОВАТЬ 26.03.2012 13:31 EST
Используя метод "Ручной пул" в ответе @JF Себастьяна ниже, я получаю параллельное выполнение моего внешнего.exe. Реализации моделей вызываются партиями по 8 за раз, но он не ждет завершения этих 8 запусков, прежде чем вызывать следующий пакет и так далее:
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam])
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
Нет трассировки ошибок. run()
Функция выполняет свои обязанности при вызове одного файла реализации модели, как и с несколькими файлами. Разница лишь в том, что с несколькими файлами это называется len(files)
время, хотя каждый из экземпляров немедленно закрывается, и только одному прогону модели разрешается завершить, и в этот момент скрипт завершает работу изящно (код выхода 0).
Добавление некоторых операторов печати в main()
показывает некоторую информацию об активных счетчиках потоков, а также о состоянии потоков (обратите внимание, что это тест только на 8 файлах реализации, чтобы сделать снимок экрана более управляемым, теоретически все 8 файлов должны выполняться одновременно, однако поведение продолжается там, где они есть икру и сразу умирает, кроме одного)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\test')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
print('Active Count a',threading.activeCount())
for _ in threads:
print(_)
q.put_nowait(None) # signal no more files
for t in threads:
print(t)
t.join() # wait for completion
print('Active Count b',threading.activeCount())
** Строка, которая читает "D:\\Data\\Users...
"это информация об ошибке, которая выдается, когда я вручную прекращаю работу модели до ее завершения. Как только я прекращаю работу модели, сообщается об оставшихся строках состояния потока и происходит завершение работы сценария.
РЕДАКТИРОВАТЬ 26.03.2012 16:24 EST
SEAWAT разрешает параллельное выполнение, как я делал это в прошлом, порождая экземпляры вручную, используя iPython и запуская из каждой папки файла модели. На этот раз я запускаю все прогоны модели из одного места, а именно из каталога, в котором находится мой скрипт. Похоже, виновником может быть то, что SEAWAT экономит часть продукции. Когда SEAWAT запущен, он сразу создает файлы, относящиеся к прогону модели. Один из этих файлов сохраняется не в каталог, в котором находится реализация модели, а в верхнем каталоге, где расположен скрипт. Это препятствует тому, чтобы любые последующие потоки сохраняли одно и то же имя файла в том же месте (что они все хотят сделать, так как эти имена файлов являются общими и не специфичными для каждой реализации). Окна SEAWAT не оставались открытыми достаточно долго, чтобы я мог прочитать или даже увидеть, что появилось сообщение об ошибке, я понял это только когда вернулся и попытался запустить код, используя iPython, который напрямую отображает распечатку из SEAWAT вместо открытия Новое окно для запуска программы.
Я принимаю ответ @JF Себастьяна, так как вполне вероятно, что, как только я решу эту проблему, связанную с выполнением модели, предоставленный им многопоточный код доставит меня туда, где я должен быть.
ЗАКЛЮЧИТЕЛЬНЫЙ КОД
Добавлен аргумент cwd в subprocess.check_call для запуска каждого экземпляра SEAWAT в своем собственном каталоге. Очень ключ.
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading
def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
cwd = ws + r'\reals\real%s\ss' % real
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()
2 ответа
Я не вижу никаких вычислений в коде Python. Если вам просто нужно выполнить несколько внешних программ параллельно, достаточно использовать subprocess
запустить программы и threading
модуль для поддержания постоянного числа запущенных процессов, но самый простой код использует multiprocessing.Pool
:
#!/usr/bin/env python
import os
import multiprocessing as mp
def run(filename_def_param):
filename, def_param = filename_def_param # unpack arguments
... # call external program on `filename`
def safe_run(*args, **kwargs):
"""Call run(), catch exceptions."""
try: run(*args, **kwargs)
except Exception as e:
print("error: %s run(*%r, **%r)" % (e, args, kwargs))
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
workdir = os.path.join(ws, r'fieldgen\reals')
files = ((os.path.join(workdir, f), ws)
for f in os.listdir(workdir) if f.endswith('.npy'))
# start processes
pool = mp.Pool() # use all available CPUs
pool.map(safe_run, files)
if __name__=="__main__":
mp.freeze_support() # optional if the program is not frozen
main()
Если есть много файлов, то pool.map()
может быть заменено for _ in pool.imap_unordered(safe_run, files): pass
,
Существует также mutiprocessing.dummy.Pool
который обеспечивает тот же интерфейс, что и multiprocessing.Pool
но использует потоки вместо процессов, которые могут быть более подходящими в этом случае.
Вам не нужно оставлять некоторые процессоры свободными. Просто используйте команду, которая запускает ваши исполняемые файлы с низким приоритетом (в Linux это nice
программа).
ThreadPoolExecutor
пример
concurrent.futures.ThreadPoolExecutor
будет и простым, и достаточным, но для этого потребуется сторонняя зависимость от Python 2.x (он находится в stdlib начиная с Python 3.2).
#!/usr/bin/env python
import os
import concurrent.futures
def run(filename, def_param):
... # call external program on `filename`
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_file = dict((executor.submit(run, f, ws), f) for f in files)
for future in concurrent.futures.as_completed(future_to_file):
f = future_to_file[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (f, future.exception()))
# run() doesn't return anything so `future.result()` is always `None`
Или если мы игнорируем исключения, выдвинутые run()
:
from itertools import repeat
... # the same
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, files, repeat(ws))
# run() doesn't return anything so `map()` results can be ignored
subprocess
+ threading
(ручной пул) решение
#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread
def run(filename, def_param):
... # define exe, swt_nam
subprocess.check_call([exe, swt_nam]) # run external program
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
Вот мой способ сохранить минимальное количество потоков в памяти. Это комбинация потоков и многопроцессорных модулей. Это может быть необычно для других методов, таких как уважаемые коллеги, которые объяснили выше, НО может стоить немалых денег. Для объяснения я использую сценарий сканирования как минимум 5 веб-сайтов одновременно.
так вот оно:
#importing dependencies.
from multiprocessing import Process
from threading import Thread
import threading
# Crawler function
def crawler(domain):
# define crawler technique here.
output.write(scrapeddata + "\n")
pass
Далее идет функция threadController. Эта функция будет контролировать поток потоков в основную память. Он будет продолжать активировать потоки для поддержания "минимального" предела threadNum, т.е. 5. Также он не завершится, пока все активные потоки (acitveCount) не будут завершены.
Он будет поддерживать минимум потоков функции startProcess threadNum(5) (эти потоки в конечном итоге будут запускать процессы из processList, одновременно соединяя их со временем из 60 секунд). После запуска threadController будет 2 потока, которые не включены в вышеуказанный предел в 5, т.е. Основной поток и сам поток threadController. вот почему используется threading.activeCount()!= 2.
def threadController():
print "Thread count before child thread starts is:-", threading.activeCount(), len(processList)
# staring first thread. This will make the activeCount=3
Thread(target = startProcess).start()
# loop while thread List is not empty OR active threads have not finished up.
while len(processList) != 0 or threading.activeCount() != 2:
if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND
len(processList) != 0): # processList is not empty
Thread(target = startProcess).start() # This line would start startThreads function as a seperate thread **
Функция startProcess, как отдельный поток, запускает процессы из списка процессов. Назначение этой функции (** запущенной как другой поток) состоит в том, что она станет родительским потоком для процессов. Поэтому, когда он присоединится к ним с тайм-аутом в 60 секунд, это остановит поток startProcess для продвижения вперед, но это не остановит выполнение threadController. Таким образом, ThreadController будет работать как требуется.
def startProcess():
pr = processList.pop(0)
pr.start()
pr.join(60.00) # joining the thread with time out of 60 seconds as a float.
if __name__ == '__main__':
# a file holding a list of domains
domains = open("Domains.txt", "r").read().split("\n")
output = open("test.txt", "a")
processList = [] # thread list
threadNum = 5 # number of thread initiated processes to be run at one time
# making process List
for r in range(0, len(domains), 1):
domain = domains[r].strip()
p = Process(target = crawler, args = (domain,))
processList.append(p) # making a list of performer threads.
# starting the threadController as a seperate thread.
mt = Thread(target = threadController)
mt.start()
mt.join() # won't let go next until threadController thread finishes.
output.close()
print "Done"
Помимо поддержания минимального количества потоков в памяти, моя цель состояла в том, чтобы также иметь что-то, что могло бы избежать застревания потоков или процессов в памяти. Я сделал это с помощью функции тайм-аута. Мои извинения за любую опечатку.
Я надеюсь, что эта конструкция поможет любому в этом мире. С уважением, Викас Гаутам