Как использовать subprocess.Popen для соединения нескольких процессов по каналам?

Как выполнить следующую команду оболочки с использованием Python subprocess модуль?

echo "input data" | awk -f script.awk | sort > outfile.txt

Входные данные будут поступать из строки, поэтому мне на самом деле не нужно echo, У меня так далеко, может кто-нибудь объяснить, как я получаю это через трубу sort тоже?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

ОБНОВЛЕНИЕ: Обратите внимание, что хотя принятый ответ ниже фактически не отвечает на поставленный вопрос, я считаю, что С.Лотт прав, и лучше вообще не решать эту проблему!

10 ответов

Решение

Вы были бы немного счастливее со следующим.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Делегировать часть работы в оболочку. Пусть он соединит два процесса с конвейером.

Вы были бы намного счастливее переписать 'script.awk' в Python, исключив awk и конвейер.

Редактировать Некоторые из причин предположить, что awk не помогает.

[Есть слишком много причин, чтобы ответить через комментарии.]

  1. Awk добавляет шаг, который не имеет существенного значения. В обработке awk нет ничего уникального, с чем Python не справляется.

  2. Конвейерная обработка от awk к сортировке для больших наборов данных может сократить время обработки. Для коротких наборов данных это не имеет существенного преимущества. Быстрое измерение awk >file ; sort file а также awk | sort поможет выявить параллелизм С сортировкой это редко помогает, потому что сортировка не является сквозным фильтром.

  3. Простота обработки "Python to sort" (вместо "Python to awk to sort") предотвращает точный вид вопросов, задаваемых здесь.

  4. Python - хотя и более многословный, чем awk - также явный, где awk имеет определенные неявные правила, которые непрозрачны для новичков и вводят в заблуждение неспециалистов.

  5. Awk (как и сам скрипт оболочки) добавляет еще один язык программирования. Если все это можно сделать на одном языке (Python), устранение оболочки и программирование на awk исключают два языка программирования, что позволяет кому-то сосредоточиться на составляющих ценность частях задачи.

Итог: awk не может добавить значительную ценность. В этом случае awk является чистой стоимостью; это добавило достаточно сложности, чтобы пришлось задать этот вопрос. Удаление awk будет чистой прибылью.

Боковая панель Зачем строить трубопровод (a | b) это так сложно.

Когда оболочка сталкивается с a | b это должно сделать следующее.

  1. Разветвите дочерний процесс оригинальной оболочки. Это в конечном итоге станет б.

  2. Сборка ОС трубы. (не подпроцесс Python.PIPE), но вызов os.pipe() который возвращает два новых файловых дескриптора, которые связаны через общий буфер. На этом этапе у процесса есть stdin, stdout, stderr от его родителя, плюс файл, который будет "a's stdout" и "b's stdin".

  3. Вилка ребенка. Ребенок заменяет свой стандартный вывод новым стандартным выводом. Exec the a процесс.

  4. Дочерний объект b закрывает, заменяет свой стандартный ввод новым стандартным b. Exec the b процесс.

  5. Ребенок b ждет завершения.

  6. Родитель ожидает завершения b.

Я думаю, что вышеупомянутое может использоваться рекурсивно, чтобы порождать a | b | c, но вы должны неявно заключить в скобки длинные конвейеры, рассматривая их как a | (b | c),

Так как Python имеет os.pipe(), os.exec() а также os.fork()и вы можете заменить sys.stdin а также sys.stdoutЕсть способ сделать вышеупомянутое в чистом Python. Действительно, вы можете быть в состоянии выработать некоторые ярлыки, используя os.pipe() а также subprocess.Popen,

Однако проще передать эту операцию оболочке.

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

Чтобы эмулировать конвейер оболочки:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

без вызова оболочки (см. 17.1.4.2. Замена конвейера оболочки):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum обеспечивает некоторый синтаксис сахара:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

Аналог:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

является:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

Принятый ответ обходит стороной проблему. Вот фрагмент кода, который объединяет выходные данные нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться, что выходные данные верны.

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

http://web.archive.org/web/20081222144252/http://www.python.org/doc/2.5.2/lib/node535.html освещал это довольно хорошо. Есть ли какая-то часть этого, которую вы не поняли?

Ваша программа была бы очень похожа, но вторая Popen будет иметь stdout = в файл, и вам не понадобится вывод его .communicate(),

Вдохновленный ответом @Cristian. Я встретил точно такую ​​же проблему, но с другой командой. Итак, я помещаю свой проверенный пример, который, я считаю, может быть полезен:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

Это проверено.

Что сделано

  • Объявлен ленивым grep исполнение со стандартным вводом из трубы. Эта команда будет выполнена на ps выполнение команды, когда труба будет заполнена стандартным выводом ps,
  • Вызывается основной командой ps с выводом, направленным на трубу, используемую grep команда.
  • Grep связался, чтобы получить стандартный вывод из трубы.

Мне нравится этот способ, потому что это естественная концепция труб, аккуратно обернутая subprocess интерфейсы.

Предыдущие ответы упустили важный момент. Как говорит Geocar, замена оболочки конвейера в основном правильная. Это почти достаточно для запуска communicate на последнем элементе трубы.

Оставшаяся проблема - передача входных данных в конвейер. С несколькими подпроцессами, простой communicate(input_data) на последнем элементе не работает - он висит навсегда. Вам нужно вручную создать конвейер и дочерний процесс, например так:

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

Теперь дочерний элемент предоставляет входные данные через канал, а родительские вызовы connect () работают должным образом. При таком подходе вы можете создавать произвольные длинные конвейеры, не прибегая к "делегированию части работы оболочке". К сожалению, в документации подпроцесса об этом не говорится.

Есть способы добиться того же эффекта без труб:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

Сейчас использую stdin=tf за p_awk, Это вопрос вкуса, что вы предпочитаете.

Вышеуказанное все еще не на 100% эквивалентно bash-конвейерам, потому что обработка сигналов отличается. Вы можете увидеть это, если добавите еще один элемент трубы, который усекает вывод sortнапример, head -n 10, С кодом выше, sort выведет сообщение об ошибке "Сломанная труба" stderr, Вы не увидите это сообщение при запуске того же конвейера в оболочке. (Это единственное отличие, результат в stdout та же). Кажется, причина в том, что питон Popen наборы SIG_IGN за SIGPIPEтогда как оболочка оставляет его на SIG_DFL, а также sortОбработка сигналов в этих двух случаях различна.

РЕДАКТИРОВАТЬ: pipes доступно в Windows, но, что особенно важно, на самом деле не работает в Windows. Смотрите комментарии ниже.

Стандартная библиотека Python теперь включает в себя pipes модуль для обработки этого:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

Я не уверен, как долго этот модуль был вокруг, но этот подход, кажется, намного проще, чем копаться с subprocess,

Для меня приведенный ниже подход является самым чистым и легким для чтения

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

который можно назвать так:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')

(Я знаю, что это нарушение протокола, но у меня недостаточно репутации, чтобы опубликовать это в качестве комментария, и публикация его в качестве нового вопроса также может показаться неправильной.)

Это дополнительный вопрос к ответу @Omry Yadan от 9 декабря 2018 г. Мне нужно создать конвейер из трех программ и собрать stderr и коды возврата из всех трех программ. Мое текущее решение (ниже), основанное на этом ответе от 9 декабря 2018 г., зависает. Эквивалентная команда, вставленная в оболочку, быстро завершается.

Объем данных, передаваемых из стандартного вывода в стандартный ввод, находится в области мегабайт. Окончательный stdout, а также три stderrs, как ожидается, будут намного меньше.

      #!/usr/bin/env python3

cmd1 = ["gzip", "-dc", "some_file.gz"]
cmd2 = ["filtering_program", "some", "arguments"]
cmd3 = ["collection_program", "some", "arguments"]

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE)
p2 = Popen(cmd1, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)]
p3 = Popen(cmd1, stdin=p2.stdout, stdout=PIPE, stderr=PIPE)]
(outcome_stdout, outcome_stderr) = p3.communicate()
p1.wait()
p2.wait()
Другие вопросы по тегам