Как использовать subprocess.Popen для соединения нескольких процессов по каналам?
Как выполнить следующую команду оболочки с использованием Python subprocess
модуль?
echo "input data" | awk -f script.awk | sort > outfile.txt
Входные данные будут поступать из строки, поэтому мне на самом деле не нужно echo
, У меня так далеко, может кто-нибудь объяснить, как я получаю это через трубу sort
тоже?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
ОБНОВЛЕНИЕ: Обратите внимание, что хотя принятый ответ ниже фактически не отвечает на поставленный вопрос, я считаю, что С.Лотт прав, и лучше вообще не решать эту проблему!
10 ответов
Вы были бы немного счастливее со следующим.
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )
Делегировать часть работы в оболочку. Пусть он соединит два процесса с конвейером.
Вы были бы намного счастливее переписать 'script.awk' в Python, исключив awk и конвейер.
Редактировать Некоторые из причин предположить, что awk не помогает.
[Есть слишком много причин, чтобы ответить через комментарии.]
Awk добавляет шаг, который не имеет существенного значения. В обработке awk нет ничего уникального, с чем Python не справляется.
Конвейерная обработка от awk к сортировке для больших наборов данных может сократить время обработки. Для коротких наборов данных это не имеет существенного преимущества. Быстрое измерение
awk >file ; sort file
а такжеawk | sort
поможет выявить параллелизм С сортировкой это редко помогает, потому что сортировка не является сквозным фильтром.Простота обработки "Python to sort" (вместо "Python to awk to sort") предотвращает точный вид вопросов, задаваемых здесь.
Python - хотя и более многословный, чем awk - также явный, где awk имеет определенные неявные правила, которые непрозрачны для новичков и вводят в заблуждение неспециалистов.
Awk (как и сам скрипт оболочки) добавляет еще один язык программирования. Если все это можно сделать на одном языке (Python), устранение оболочки и программирование на awk исключают два языка программирования, что позволяет кому-то сосредоточиться на составляющих ценность частях задачи.
Итог: awk не может добавить значительную ценность. В этом случае awk является чистой стоимостью; это добавило достаточно сложности, чтобы пришлось задать этот вопрос. Удаление awk будет чистой прибылью.
Боковая панель Зачем строить трубопровод (a | b
) это так сложно.
Когда оболочка сталкивается с a | b
это должно сделать следующее.
Разветвите дочерний процесс оригинальной оболочки. Это в конечном итоге станет б.
Сборка ОС трубы. (не подпроцесс Python.PIPE), но вызов
os.pipe()
который возвращает два новых файловых дескриптора, которые связаны через общий буфер. На этом этапе у процесса есть stdin, stdout, stderr от его родителя, плюс файл, который будет "a's stdout" и "b's stdin".Вилка ребенка. Ребенок заменяет свой стандартный вывод новым стандартным выводом. Exec the
a
процесс.Дочерний объект b закрывает, заменяет свой стандартный ввод новым стандартным b. Exec the
b
процесс.Ребенок b ждет завершения.
Родитель ожидает завершения b.
Я думаю, что вышеупомянутое может использоваться рекурсивно, чтобы порождать a | b | c
, но вы должны неявно заключить в скобки длинные конвейеры, рассматривая их как a | (b | c)
,
Так как Python имеет os.pipe()
, os.exec()
а также os.fork()
и вы можете заменить sys.stdin
а также sys.stdout
Есть способ сделать вышеупомянутое в чистом Python. Действительно, вы можете быть в состоянии выработать некоторые ярлыки, используя os.pipe()
а также subprocess.Popen
,
Однако проще передать эту операцию оболочке.
import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
Чтобы эмулировать конвейер оболочки:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
без вызова оболочки (см. 17.1.4.2. Замена конвейера оболочки):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
обеспечивает некоторый синтаксис сахара:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
Аналог:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
является:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
Принятый ответ обходит стороной проблему. Вот фрагмент кода, который объединяет выходные данные нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться, что выходные данные верны.
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
http://web.archive.org/web/20081222144252/http://www.python.org/doc/2.5.2/lib/node535.html освещал это довольно хорошо. Есть ли какая-то часть этого, которую вы не поняли?
Ваша программа была бы очень похожа, но вторая Popen
будет иметь stdout = в файл, и вам не понадобится вывод его .communicate()
,
Вдохновленный ответом @Cristian. Я встретил точно такую же проблему, но с другой командой. Итак, я помещаю свой проверенный пример, который, я считаю, может быть полезен:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
Это проверено.
Что сделано
- Объявлен ленивым
grep
исполнение со стандартным вводом из трубы. Эта команда будет выполнена наps
выполнение команды, когда труба будет заполнена стандартным выводомps
, - Вызывается основной командой
ps
с выводом, направленным на трубу, используемуюgrep
команда. - Grep связался, чтобы получить стандартный вывод из трубы.
Мне нравится этот способ, потому что это естественная концепция труб, аккуратно обернутая subprocess
интерфейсы.
Предыдущие ответы упустили важный момент. Как говорит Geocar, замена оболочки конвейера в основном правильная. Это почти достаточно для запуска communicate
на последнем элементе трубы.
Оставшаяся проблема - передача входных данных в конвейер. С несколькими подпроцессами, простой communicate(input_data)
на последнем элементе не работает - он висит навсегда. Вам нужно вручную создать конвейер и дочерний процесс, например так:
import os
import subprocess
input = """\
input data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", "{ print $2; }"],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
Теперь дочерний элемент предоставляет входные данные через канал, а родительские вызовы connect () работают должным образом. При таком подходе вы можете создавать произвольные длинные конвейеры, не прибегая к "делегированию части работы оболочке". К сожалению, в документации подпроцесса об этом не говорится.
Есть способы добиться того же эффекта без труб:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
Сейчас использую stdin=tf
за p_awk
, Это вопрос вкуса, что вы предпочитаете.
Вышеуказанное все еще не на 100% эквивалентно bash-конвейерам, потому что обработка сигналов отличается. Вы можете увидеть это, если добавите еще один элемент трубы, который усекает вывод sort
например, head -n 10
, С кодом выше, sort
выведет сообщение об ошибке "Сломанная труба" stderr
, Вы не увидите это сообщение при запуске того же конвейера в оболочке. (Это единственное отличие, результат в stdout
та же). Кажется, причина в том, что питон Popen
наборы SIG_IGN
за SIGPIPE
тогда как оболочка оставляет его на SIG_DFL
, а также sort
Обработка сигналов в этих двух случаях различна.
РЕДАКТИРОВАТЬ: pipes
доступно в Windows, но, что особенно важно, на самом деле не работает в Windows. Смотрите комментарии ниже.
Стандартная библиотека Python теперь включает в себя pipes
модуль для обработки этого:
https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html
Я не уверен, как долго этот модуль был вокруг, но этот подход, кажется, намного проще, чем копаться с subprocess
,
Для меня приведенный ниже подход является самым чистым и легким для чтения
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
который можно назвать так:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
(Я знаю, что это нарушение протокола, но у меня недостаточно репутации, чтобы опубликовать это в качестве комментария, и публикация его в качестве нового вопроса также может показаться неправильной.)
Это дополнительный вопрос к ответу @Omry Yadan от 9 декабря 2018 г. Мне нужно создать конвейер из трех программ и собрать stderr и коды возврата из всех трех программ. Мое текущее решение (ниже), основанное на этом ответе от 9 декабря 2018 г., зависает. Эквивалентная команда, вставленная в оболочку, быстро завершается.
Объем данных, передаваемых из стандартного вывода в стандартный ввод, находится в области мегабайт. Окончательный stdout, а также три stderrs, как ожидается, будут намного меньше.
#!/usr/bin/env python3
cmd1 = ["gzip", "-dc", "some_file.gz"]
cmd2 = ["filtering_program", "some", "arguments"]
cmd3 = ["collection_program", "some", "arguments"]
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE)
p2 = Popen(cmd1, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)]
p3 = Popen(cmd1, stdin=p2.stdout, stdout=PIPE, stderr=PIPE)]
(outcome_stdout, outcome_stderr) = p3.communicate()
p1.wait()
p2.wait()