os.chdir между несколькими процессами Python
У меня есть сложный конвейер Python (код, который я не могу изменить), вызывая несколько других сценариев и других исполняемых файлов. Дело в том, что для запуска более 8000 каталогов требуется несколько научных исследований. Итак, я написал простую оболочку (возможно, не самую эффективную, но, кажется, работает) с использованием модуля многопроцессорной обработки.
from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit
# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"
# the Queue
Q = JoinableQueue()
i = 0
# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")
# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
out = osjoin(outbase, id_)
if not exists(out): mkdir(out)
# logfile
log = osjoin(outbase, "log_process_%s" %(pid))
try:
# call the script
system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
except:
print "ABFGP FAILED"
return
# parse the output
def extractGff(id_):
# code not relevant
# function called by multiple processes, using the Queue
def run(Q, pid):
while not Q.empty():
try:
d = Q.get()
print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)
abfgp(d, pid)
Q.task_done()
except KeyboardInterrupt:
exit("Interrupted Child")
# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
i += 1
indir = osjoin(inbase, d)
outdir = osjoin(outbase, d)
Q.put(d)
# this loop creates the multiple processes
procs = []
for pid in range(num_p):
try:
p = Process(target=run, args=(Q, pid+1))
p.daemon = True
procs.append(p)
p.start()
except KeyboardInterrupt:
print "Aborting start of child processes"
for x in procs:
x.terminate()
exit("Interrupted")
try:
for p in procs:
p.join()
except:
print "Terminating child processes"
for x in procs:
x.terminate()
exit("Interrupted")
print "Parsing output..."
for d in genedirs: extractGff(d)
Теперь проблема в том, что abfgp.py использует функцию os.chdir, которая, кажется, нарушает параллельную обработку. Я получаю много ошибок, утверждая, что некоторые (входные / выходные) файлы / каталоги не могут быть найдены для чтения / записи. Несмотря на то, что я вызываю сценарий через os.system(), из-за которого я, хотя и порождаю отдельные процессы, предотвратил бы это.
Как я могу обойти эти чды помехи?
Изменить: я мог бы изменить os.system () на subprocess.Popen(cwd="...") с правильным каталогом. Я надеюсь, что это имеет значение.
Благодарю.
2 ответа
Редактировать 2
Не использовать os.system()
использование subprocess.call()
system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
будет переводить на
subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log.
Редактировать 1 Я думаю, что проблема заключается в том, что многопроцессорная обработка использует имена модулей для сериализации функций, классов.
Это означает, что если вы делаете import module
где находится модуль ./module.py
и вы делаете что-то вроде os.chdir('./dir')
теперь вам нужно from .. import module
,
Дочерние процессы наследуют папку родительского процесса. Это может быть проблемой.
Решения
- Убедитесь, что все модули импортированы (в дочерних процессах) и после этого вы меняете каталог
- вставить оригинал
os.getcwd()
вsys.path
включить импорт из исходного каталога. Это должно быть сделано до вызова каких-либо функций из локального каталога. - поместите все функции, которые вы используете, в каталог, который всегда можно импортировать.
site-packages
может быть такой каталог. Тогда вы можете сделать что-то вродеimport module
module.main()
чтобы начать то, что вы делаете. Это хак, потому что я знаю, как работает маринад. Используйте это только в случае неудачи других попыток. Скрипт печатает:
serialized # the function runD is serialized string executed # before the function is loaded the code is executed loaded # now the function run is deserialized run # run is called
В вашем случае вы бы сделали что-то вроде этого:
runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run) p = Process(target=runD, args=(Q, pid+1))
Это скрипт:
# functions that you need class R(object): def __init__(self, call, *args): self.ret = (call, args) def __reduce__(self): return self.ret def __call__(self, *args, **kw): raise NotImplementedError('this should never be called') class evalBeforeDeserialize(object): def __init__(self, string, function): self.function = function self.string = string def __reduce__(self): return R(getattr, tuple, '__getitem__'), \ ((R(eval, self.string), self.function), -1) # code to show how it works def printing(): print('string executed') def run(): print('run') runD = evalBeforeDeserialize('__import__("__main__").printing()', run) import pickle s = pickle.dumps(runD) print('serialized') run2 = pickle.loads(s) print('loaded') run2()
Пожалуйста, сообщите, если они не работают.
Вы можете определить, какой экземпляр os
библиотека, которую использует неизменяемая программа; затем создать адаптированную версию chdir
в той библиотеке, которая делает то, что вам нужно - предотвратить изменение каталога, зарегистрировать его, что угодно. Если настраиваемое поведение должно быть только для одной программы, вы можете использовать inspect
модуль для идентификации вызывающего абонента и индивидуального определения поведения именно для этого вызывающего абонента.
Ваши возможности ограничены, если вы действительно не можете изменить существующую программу; но если у вас есть возможность изменять импортируемые библиотеки, что-то вроде этого может быть наименее инвазивным способом избежать нежелательного поведения.
Обычные предостережения применяются при изменении стандартной библиотеки.