Несколько труб в подпроцессе
Я пытаюсь использовать Sailfish, который принимает несколько файлов fastq в качестве аргументов, в конвейере Ruffus. Я выполняю Sailfish с помощью модуля подпроцесса в Python, но <()
в вызове подпроцесса не работает даже когда я установил shell=True
,
Это команда, которую я хочу выполнить с помощью Python:
sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]
или (предпочтительно):
sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]
Обобщение:
someprogram <(someprocess) <(someprocess)
Как мне поступить в Python? Подпроцесс - это правильный подход?
2 ответа
Чтобы эмулировать замену процесса bash:
#!/usr/bin/env python
from subprocess import check_call
check_call('someprogram <(someprocess) <(anotherprocess)',
shell=True, executable='/bin/bash')
В Python вы можете использовать именованные каналы:
#!/usr/bin/env python
from subprocess import Popen
with named_pipes(n=2) as paths:
someprogram = Popen(['someprogram'] + paths)
processes = []
for path, command in zip(paths, ['someprocess', 'anotherprocess']):
with open(path, 'wb', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()
где named_pipes(n)
является:
import os
import shutil
import tempfile
from contextlib import contextmanager
@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)
Другой и более предпочтительный способ (не нужно создавать именованную запись на диске) для реализации подстановки процесса bash заключается в использовании /dev/fd/N
имена файлов (если они доступны), как предложено @Dunes. На FreeBSD, fdescfs(5)
( /dev/fd/#
) создает записи для всех файловых дескрипторов, открытых процессом. Чтобы проверить доступность, запустите:
$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available
Если это не удается; попробуй симлинк /dev/fd
в proc(5)
как это делается на некоторых Linux:
$ ln -s /proc/self/fd /dev/fd
Вот /dev/fd
на основе реализации someprogram <(someprocess) <(anotherprocess)
команда bash:
#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE
def kill(process):
if process.poll() is None: # still running
process.kill()
with ExitStack() as stack: # for proper cleanup
processes = []
for command in [['someprocess'], ['anotherprocess']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit
fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)
Примечание: на моей машине с Ubuntu subprocess
код работает только в Python 3.4+, несмотря на pass_fds
быть доступным с Python 3.2.
Хотя JF Sebastian предоставил ответ, используя именованные каналы, это можно сделать с помощью анонимных каналов.
import shlex
from subprocess import Popen, PIPE
inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"
def get_filename(file_):
return "/dev/fd/{}".format(file_.fileno())
def get_stdout_fds(*processes):
return tuple(p.stdout.fileno() for p in processes)
# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)
# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout),
file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe's fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE,
pass_fds=get_stdout_fds(inputproc0, inputproc1))
output, error = someprogram.communicate()
for p in [inputproc0, inputproc1, someprogram]:
p.wait()
assert output == b"hello\nworld\n"