os.chdir между несколькими процессами Python

У меня есть сложный конвейер Python (код, который я не могу изменить), вызывая несколько других сценариев и других исполняемых файлов. Дело в том, что для запуска более 8000 каталогов требуется несколько научных исследований. Итак, я написал простую оболочку (возможно, не самую эффективную, но, кажется, работает) с использованием модуля многопроцессорной обработки.

from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit

# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"

# the Queue
Q = JoinableQueue()
i = 0

# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")

# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
    out = osjoin(outbase, id_)
    if not exists(out): mkdir(out)

    # logfile
    log = osjoin(outbase, "log_process_%s" %(pid))
    try:
        # call the script
        system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
    except:
        print "ABFGP FAILED"
        return

# parse the output
def extractGff(id_):
   # code not relevant


# function called by multiple processes, using the Queue
def run(Q, pid):
    while not Q.empty():
        try:
            d = Q.get()             
            print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)          
            abfgp(d, pid)
            Q.task_done()
        except KeyboardInterrupt:
            exit("Interrupted Child")

# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
    i += 1
    indir = osjoin(inbase, d)
    outdir = osjoin(outbase, d)
    Q.put(d)

# this loop creates the multiple processes
procs = []
for pid in range(num_p):
    try:
        p = Process(target=run, args=(Q, pid+1))
        p.daemon = True
        procs.append(p) 
        p.start()
    except KeyboardInterrupt:
        print "Aborting start of child processes"
        for x in procs:
            x.terminate()
        exit("Interrupted")     

try:
    for p in procs:
        p.join()
except:
    print "Terminating child processes"
    for x in procs:
        x.terminate()
    exit("Interrupted")

print "Parsing output..."
for d in genedirs: extractGff(d)

Теперь проблема в том, что abfgp.py использует функцию os.chdir, которая, кажется, нарушает параллельную обработку. Я получаю много ошибок, утверждая, что некоторые (входные / выходные) файлы / каталоги не могут быть найдены для чтения / записи. Несмотря на то, что я вызываю сценарий через os.system(), из-за которого я, хотя и порождаю отдельные процессы, предотвратил бы это.

Как я могу обойти эти чды помехи?

Изменить: я мог бы изменить os.system () на subprocess.Popen(cwd="...") с правильным каталогом. Я надеюсь, что это имеет значение.

Благодарю.

2 ответа

Решение

Редактировать 2

Не использовать os.system() использование subprocess.call()

system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))

будет переводить на

subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log.

Редактировать 1 Я думаю, что проблема заключается в том, что многопроцессорная обработка использует имена модулей для сериализации функций, классов.

Это означает, что если вы делаете import module где находится модуль ./module.py и вы делаете что-то вроде os.chdir('./dir') теперь вам нужно from .. import module,

Дочерние процессы наследуют папку родительского процесса. Это может быть проблемой.

Решения

  1. Убедитесь, что все модули импортированы (в дочерних процессах) и после этого вы меняете каталог
  2. вставить оригинал os.getcwd() в sys.path включить импорт из исходного каталога. Это должно быть сделано до вызова каких-либо функций из локального каталога.
  3. поместите все функции, которые вы используете, в каталог, который всегда можно импортировать. site-packages может быть такой каталог. Тогда вы можете сделать что-то вроде import modulemodule.main() чтобы начать то, что вы делаете.
  4. Это хак, потому что я знаю, как работает маринад. Используйте это только в случае неудачи других попыток. Скрипт печатает:

    serialized # the function runD is serialized
    string executed # before the function is loaded the code is executed
    loaded # now the function run is deserialized
    run # run is called
    

    В вашем случае вы бы сделали что-то вроде этого:

    runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run)
    p = Process(target=runD, args=(Q, pid+1))
    

    Это скрипт:

    # functions that you need
    
    class R(object):
        def __init__(self, call, *args):
    
            self.ret = (call, args)
        def __reduce__(self):
            return self.ret
        def __call__(self, *args, **kw):
            raise NotImplementedError('this should never be called')
    
    class evalBeforeDeserialize(object):
        def __init__(self, string, function):
            self.function = function
            self.string = string
        def __reduce__(self):
            return R(getattr, tuple, '__getitem__'), \
                     ((R(eval, self.string), self.function), -1)
    
    # code to show how it works        
    
    def printing():
        print('string executed')
    
    def run():
        print('run')
    
    runD = evalBeforeDeserialize('__import__("__main__").printing()', run)
    
    import pickle
    
    s = pickle.dumps(runD)
    print('serialized')
    run2 = pickle.loads(s)
    print('loaded')
    run2()
    

Пожалуйста, сообщите, если они не работают.

Вы можете определить, какой экземпляр os библиотека, которую использует неизменяемая программа; затем создать адаптированную версию chdir в той библиотеке, которая делает то, что вам нужно - предотвратить изменение каталога, зарегистрировать его, что угодно. Если настраиваемое поведение должно быть только для одной программы, вы можете использовать inspect модуль для идентификации вызывающего абонента и индивидуального определения поведения именно для этого вызывающего абонента.

Ваши возможности ограничены, если вы действительно не можете изменить существующую программу; но если у вас есть возможность изменять импортируемые библиотеки, что-то вроде этого может быть наименее инвазивным способом избежать нежелательного поведения.

Обычные предостережения применяются при изменении стандартной библиотеки.

Другие вопросы по тегам