Оперативный вывод команды подпроцесса
Я использую скрипт Python в качестве драйвера для гидродинамического кода. Когда приходит время запустить симуляцию, я использую subprocess.Popen
чтобы запустить код, соберите выходные данные из stdout и stderr в subprocess.PIPE
--- тогда я могу распечатать (и сохранить в лог-файл) выходную информацию и проверить на наличие ошибок. Проблема в том, что я понятия не имею, как продвигается код. Если я запускаю его непосредственно из командной строки, он дает мне вывод о том, в какой итерации он находится, во сколько, каков следующий временной шаг и т. Д.
Есть ли способ как сохранить выходные данные (для ведения журнала и проверки ошибок), так и для вывода в реальном времени?
Соответствующий раздел моего кода:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failed\n\n%s\n\n" % (errors)
success = False
if( errors ): log_file.write("\n\n%s\n\n" % errors)
Первоначально я обжигала run_command
через tee
так что копия отправлялась прямо в лог-файл, а поток по-прежнему выводился прямо на терминал - но таким образом я не могу хранить никаких ошибок (насколько мне известно).
Редактировать:
Временное решение:
ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()
затем в другом терминале запустите tail -f log.txt
(ул log_file = 'log.txt'
).
26 ответов
У вас есть два способа сделать это: создать итератор из read
или же readline
функции и делают:
import subprocess
import sys
with open('test.log', 'w') as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), ''): # replace '' with b'' for Python 3
sys.stdout.write(c)
f.write(c)
или же
import subprocess
import sys
with open('test.log', 'w') as f:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''): # replace '' with b'' for Python 3
sys.stdout.write(line)
f.write(line)
Или вы можете создать reader
и writer
файл. Пройти writer
к Popen
и читать из reader
import io
import time
import subprocess
import sys
filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
process = subprocess.Popen(command, stdout=writer)
while process.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())
Таким образом, вы будете иметь данные, записанные в test.log
а также на стандартный вывод.
Единственным преимуществом файлового подхода является то, что ваш код не блокируется. Таким образом, вы можете делать все, что вы хотите в то же время и читать, когда вы хотите от reader
неблокирующим способом. Когда вы используете PIPE
, read
а также readline
функции будут блокироваться до тех пор, пока один канал не будет записан в канал или строка не будет записана в канал соответственно.
Резюме (или версия "tl;dr"): легко, когда есть не более одного subprocess.PIPE
иначе сложно.
Может быть, пришло время объяснить немного о том, как subprocess.Popen
делает свое дело
(Предостережение: это для Python 2.x, хотя 3.x похож; и я довольно размышляю над вариантом Windows. Я лучше разбираюсь в POSIX).
Popen
функция должна иметь дело с потоками ввода-вывода от нуля до трех, несколько одновременно. Они обозначены stdin
, stdout
, а также stderr
по-прежнему.
Вы можете предоставить:
None
, указывая, что вы не хотите перенаправлять поток. Вместо этого он унаследует их как обычно. Обратите внимание, что в системах POSIX, по крайней мере, это не означает, что он будет использовать Pythonsys.stdout
просто фактический вывод Python; см. демонстрацию в конце.int
значение. Это "сырой" файловый дескриптор (по крайней мере, в POSIX). (Примечание:PIPE
а такжеSTDOUT
на самом делеint
s внутри, но являются "невозможными" дескрипторами, -1 и -2.)- Поток - действительно, любой объект с
fileno
метод.Popen
найдет дескриптор для этого потока, используяstream.fileno()
, а затем продолжить как дляint
значение. subprocess.PIPE
, указывая, что Python должен создать канал.subprocess.STDOUT
(заstderr
только): указать Python использовать тот же дескриптор, что и дляstdout
, Это имеет смысл, только если вы предоставили (неNone
) значение дляstdout
и даже тогда, это нужно только если вы установитеstdout=subprocess.PIPE
, (В противном случае вы можете просто указать тот же аргумент, что и дляstdout
например,Popen(..., stdout=stream, stderr=stream)
.)
Самые простые случаи (без труб)
Если вы ничего не перенаправляете (оставьте все три по умолчанию None
стоимость или предложение явно None
), Pipe
это довольно легко. Нужно просто раскрутить подпроцесс и запустить его. Или, если вы перенаправляете наPIPE
-an int
или поток fileno()
- это все еще легко, поскольку ОС выполняет всю работу. Python просто нужно раскрутить подпроцесс, подключив его stdin, stdout и / или stderr к предоставленным дескрипторам файлов.
Все еще легкий случай: одна труба
Если вы перенаправите только один поток, Pipe
все еще есть вещи довольно легко. Давайте выберем один поток за раз и посмотрим.
Предположим, вы хотите поставить некоторые stdin
, но пусть stdout
а также stderr
перейти без перенаправления или перейти к дескриптору файла. Как родительский процесс, ваша программа на Python должна просто использовать write()
отправить данные по трубе. Вы можете сделать это самостоятельно, например:
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc
или вы можете передать данные STDIN в proc.communicate()
который затем делает stdin.write
показано выше. Там нет выхода возвращаясь так communicate()
есть только одна настоящая работа: он также закрывает для вас трубу. (Если вы не звоните proc.communicate()
ты должен позвонить proc.stdin.close()
закрыть канал, чтобы подпроцесс знал, что больше нет данных, проходящих через него.)
Предположим, вы хотите захватить stdout
но оставь stdin
а также stderr
в одиночестве. Опять же, это просто: просто позвоните proc.stdout.read()
(или эквивалентный), пока нет больше вывода. поскольку proc.stdout()
это обычный поток ввода / вывода Python, в котором вы можете использовать все обычные конструкции, например:
for line in proc.stdout:
или, опять же, вы можете использовать proc.communicate()
, который просто делает read()
для тебя.
Если вы хотите захватить только stderr
работает так же как и с stdout
,
Есть еще один трюк, прежде чем все станет сложно. Предположим, вы хотите захватить stdout
, а также захват stderr
но на той же трубе, что и стандартный вывод:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
В этом случае, subprocess
"читы"! Что ж, он должен это сделать, так что это на самом деле не обман: он запускает подпроцесс как со своим stdout, так и со своим stderr, направленным в (единственный) дескриптор канала, который возвращается обратно в свой родительский (Python) процесс. На родительской стороне снова есть только один дескриптор канала для чтения вывода. Весь вывод "stderr" отображается в proc.stdout
и если вы позвоните proc.communicate()
результат stderr (второе значение в кортеже) будет None
, а не строка.
Жесткие чехлы: две или более труб
Все проблемы возникают, когда вы хотите использовать как минимум две трубы. На самом деле, subprocess
сам код имеет этот бит:
def communicate(self, input=None):
...
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
Но, увы, здесь мы сделали как минимум две, а может и три разные трубы, поэтому count(None)
возвращает 1 или 0. Мы должны делать вещи трудным путем.
В Windows это использует threading.Thread
накапливать результаты для self.stdout
а также self.stderr
и имеет родительский поток доставить self.stdin
входные данные (а затем закрыть канал).
На POSIX это использует poll
если доступно, в противном случае select
, чтобы накапливать вывод и доставлять ввод ввода. Все это выполняется в (единственном) родительском процессе / потоке.
Здесь нужны темы или опрос / выборка, чтобы избежать тупика. Предположим, например, что мы перенаправили все три потока на три отдельных канала. Предположим далее, что существует небольшое ограничение на то, сколько данных можно вставить в канал, прежде чем процесс записи будет приостановлен, ожидая, пока процесс чтения "очистит" канал от другого конца. Давайте установим это небольшое ограничение в один байт, просто для иллюстрации. (Это на самом деле, как все работает, за исключением того, что предел намного больше, чем один байт.)
Если родительский (Python) процесс пытается записать несколько байтов, скажем, 'go\n'
в proc.stdin
первый байт входит, а затем второй вызывает приостановку процесса Python, ожидая, пока подпроцесс прочитает первый байт, опустошая канал.
Между тем, предположим, что подпроцесс решает напечатать дружественное "Привет! Не паникуйте!" приветствие. H
идет в свою трубу, но e
вызывает его приостановку, ожидая, что его родитель прочитает H
, опустошая трубу.
Теперь мы застряли: процесс Python спит, ожидая, чтобы закончить, говоря "go", и подпроцесс также спит, ожидая, чтобы закончить, говоря "Hello! Don't Panic!".
subprocess.Popen
Код позволяет избежать этой проблемы с помощью threading-or-select/poll. Когда байты могут пройти по каналам, они идут. Когда они не могут, только поток (не весь процесс) должен спать - или, в случае выбора / опроса, процесс Python ожидает одновременно "может записать" или "данные доступны", записывает в stdin процесса только когда есть место, и читает свои stdout и / или stderr только тогда, когда данные готовы. proc.communicate()
код (на самом деле _communicate
где обрабатываются волосатые случаи) возвращает после того, как все данные стандартного ввода (если таковые имеются) были отправлены и все данные стандартного вывода и / или stderr были собраны.
Если вы хотите прочитать оба stdout
а также stderr
на двух разных трубах (независимо от любого stdin
перенаправление), вам также нужно избегать тупиковых ситуаций. Сценарий взаимоблокировки здесь другой - он возникает, когда подпроцесс записывает что-то в stderr
пока вы извлекаете данные из stdout
или наоборот - но он все еще там.
Демо
Я обещал продемонстрировать, что не перенаправленный Python subprocess
Если вы пишете в стандартный вывод, а не sys.stdout
, Итак, вот код:
from cStringIO import StringIO
import os
import subprocess
import sys
def show1():
print 'start show1'
save = sys.stdout
sys.stdout = StringIO()
print 'sys.stdout being buffered'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
in_stdout = sys.stdout.getvalue()
sys.stdout = save
print 'in buffer:', in_stdout
def show2():
print 'start show2'
save = sys.stdout
sys.stdout = open(os.devnull, 'w')
print 'after redirect sys.stdout'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
sys.stdout = save
show1()
show2()
Когда запустить:
$ python out.py
start show1
hello
in buffer: sys.stdout being buffered
start show2
hello
Обратите внимание, что первая процедура не удастся, если вы добавите stdout=sys.stdout
, как StringIO
объект не имеет fileno
, Второй будет опускать hello
если вы добавите stdout=sys.stdout
поскольку sys.stdout
был перенаправлен на os.devnull
,
(Если вы перенаправите Python file-descriptor-1, подпроцесс будет следовать этому перенаправлению. open(os.devnull, 'w')
вызов производит поток которого fileno()
больше 2.)
Мы также можем использовать файловый итератор по умолчанию для чтения stdout вместо использования конструкции iter с readline().
import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
sys.stdout.write(line)
В дополнение ко всем этим ответам, один простой подход может быть следующим:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
while process.stdout.readable():
line = process.stdout.readline()
if not line:
break
print(line.strip())
Циклически читайте поток до тех пор, пока он читается, и если он получает пустой результат, остановите его.
Ключевым моментом здесь является то, что readline()
возвращает строку (с \n
в конце), пока есть выход и пусто, если это действительно в конце.
Надеюсь, это кому-нибудь поможет.
Если вы можете использовать сторонние библиотеки, вы можете использовать что-то вроде sarge
(раскрытие: я его сопровождающий). Эта библиотека позволяет неблокировать доступ к выходным потокам из подпроцессов - она наслоена на subprocess
модуль.
Если все, что вам нужно, это то, что вывод будет виден на консоли, самым простым решением для меня было передать следующие аргументы в
Popen
with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:
который будет использовать ваши скрипты python, обрабатываемые файлом stdio
Решение 1. Журнал stdout
А ТАКЖЕ stderr
одновременно в реальном времени
Простое решение, которое записывает как stdout, так и stderr одновременно, построчно в реальном времени в файл журнала.
import subprocess as sp
from concurrent.futures import ThreadPoolExecutor
def log_popen_pipe(p, stdfile):
with open("mylog.txt", "w") as f:
while p.poll() is None:
f.write(stdfile.readline())
f.flush()
# Write the rest from the buffer
f.write(stdfile.read())
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
with ThreadPoolExecutor(2) as pool:
r1 = pool.submit(log_popen_pipe, p, p.stdout)
r2 = pool.submit(log_popen_pipe, p, p.stderr)
r1.result()
r2.result()
Решение 2: функция read_popen_pipes()
который позволяет вам перебирать оба канала (stdout/stderr) одновременно в реальном времени
import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
# The function in use:
with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
print(out_line, end='')
print(err_line, end='')
return p.poll()
Подобно предыдущим ответам, но следующее решение для меня работало на окнах, использующих Python3, чтобы предоставить общий метод для печати и входа в систему в реальном времени ( https://www.endpoint.com/blog/2015/01/28/getting-realtime-output-using-python):
def print_and_log(command, logFile):
with open(logFile, 'wb') as f:
command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
while True:
output = command.stdout.readline()
if not output and command.poll() is not None:
f.close()
break
if output:
f.write(output)
print(str(output.strip(), 'utf-8'), flush=True)
return command.poll()
Исходя из всего вышесказанного, я предлагаю слегка модифицированную версию (python3):
- цикл read вызывал readline (мне казалось, что предложенное iter-решение навсегда заблокировало меня - Python 3, Windows 7)
- структурирован, поэтому нет необходимости дублировать обработку прочитанных данных после
None
- stderr отправляется в stdout, поэтому оба выходных сигнала читаются
- Добавлен код для получения значения выхода cmd.
Код:
import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, universal_newlines=True)
while True:
rd = proc.stdout.readline()
print(rd, end='') # and whatever you want to do...
if not rd: # EOF
returncode = proc.poll()
if returncode is not None:
break
time.sleep(0.1) # cmd closed stdout, but not exited yet
# You may want to check on ReturnCode here
Почему бы не установить stdout
прямо к sys.stdout
? И если вам нужно также вывести в журнал, вы можете просто переопределить метод записи f.
import sys
import subprocess
class SuperFile(open.__class__):
def write(self, data):
sys.stdout.write(data)
super(SuperFile, self).write(data)
f = SuperFile("log.txt","w+")
process = subprocess.Popen(command, stdout=f, stderr=f)
Ни одно из решений Pythonic не сработало для меня. Оказалось, что proc.stdout.read()
или подобное может заблокировать навсегда.
Поэтому я использую tee
как это:
subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')
Это решение удобно, если вы уже используете shell=True
,
${PIPESTATUS}
фиксирует статус успеха всей цепочки команд (доступно только в Bash). Если бы я опустил && exit ${PIPESTATUS}
, то это всегда будет возвращать ноль, так как tee
никогда не подводит
unbuffer
может потребоваться для печати каждой строки непосредственно в терминал, вместо того, чтобы ждать слишком долго, пока "буферный канал" не будет заполнен. Тем не менее, unbuffer проглатывает состояние выхода assert (SIG Abort)...
2>&1
также записывает stderror в файл.
Хорошим, но "тяжеловесным" решением является использование Twisted - см. Дно.
Если вы готовы жить только с чем-то вроде этого, это должно сработать:
import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
stdoutdata = popenobj.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
print "Return code", popenobj.returncode
(Если вы используете read(), он пытается прочитать весь "файл", который бесполезен, то, что мы действительно могли бы использовать здесь, это то, что читает все данные, которые сейчас находятся в канале)
Можно также попытаться приблизиться к этому с помощью потоков, например:
import subprocess
import sys
import threading
popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
def stdoutprocess(o):
while True:
stdoutdata = o.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode
Теперь мы могли бы также добавить stderr, имея два потока.
Однако обратите внимание, что документы подпроцесса не рекомендуют использовать эти файлы напрямую и рекомендуют использовать communicate()
(в основном это касается взаимоблокировок, которые, я думаю, не являются проблемой выше), а решения немного хитрые, поэтому действительно кажется, что модуль подпроцесса не совсем подходит для работы (см. также: http://www.python.org/dev/peps/pep-3145/) и нам нужно взглянуть на что-то еще.
Более сложным решением является использование Twisted, как показано здесь: https://twistedmatrix.com/documents/11.1.0/core/howto/process.html
То, как вы делаете это с помощью Twisted, - это создание вашего процесса с использованием reactor.spawnprocess()
и предоставление ProcessProtocol
то обрабатывает вывод асинхронно. Пример кода Python для Twisted находится здесь: https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py
Я нашел простое решение очень сложной проблемы.
- Как stdout, так и stderr необходимо транслировать.
- Оба они должны быть неблокирующими: когда нет вывода и когда вывода слишком много.
- Не хотите использовать многопроцессорность или многопроцессорность, а также не желаете использовать pexpect.
Это решение использует суть, которую я нашел здесь
import subprocess as sbp
import fcntl
import os
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.readline()
except:
return ""
with sbp.Popen('find / -name fdsfjdlsjf',
shell=True,
universal_newlines=True,
encoding='utf-8',
bufsize=1,
stdout=sbp.PIPE,
stderr=sbp.PIPE) as p:
while True:
out = non_block_read(p.stdout)
err = non_block_read(p.stderr)
if out:
print(out, end='')
if err:
print('E: ' + err, end='')
if p.poll() is not None:
break
Все вышеперечисленные решения, которые я попробовал, не смогли разделить вывод stderr и stdout (несколько каналов) или были заблокированы навсегда, когда буфер канала ОС был заполнен, что происходит, когда команда, которую вы запускаете, выводит слишком быстро (для Python есть предупреждение об этом) опрос () руководство подпроцесса). Единственный надежный способ, который я нашел, был через select, но это решение только для posix:
import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poller = select.poll()
poller.register(p.stdout, select.POLLIN)
poller.register(p.stderr, select.POLLIN)
coutput=''
cerror=''
fdhup={}
fdhup[p.stdout.fileno()]=0
fdhup[p.stderr.fileno()]=0
while sum(fdhup.values()) < len(fdhup):
try:
r = poller.poll(1)
except select.error, err:
if err.args[0] != EINTR:
raise
r=[]
for fd, flags in r:
if flags & (select.POLLIN | select.POLLPRI):
c = os.read(fd, 1024)
if rtoutput:
sys.stdout.write(c)
sys.stdout.flush()
if fd == p.stderr.fileno():
cerror+=c
else:
coutput+=c
else:
fdhup[fd]=1
return p.poll(), coutput.strip(), cerror.strip()
Я думаю, что subprocess.communicate
метод немного вводит в заблуждение: он фактически заполняет stdout и stderr, которые вы указали в subprocess.Popen
,
Тем не менее, чтение из subprocess.PIPE
что вы можете предоставить subprocess.Popen
Параметры stdout и stderr в конечном итоге заполнят буферы конвейера ОС и блокируют ваше приложение (особенно если у вас несколько процессов / потоков, которые должны использовать subprocess
).
Мое предлагаемое решение состоит в том, чтобы предоставить stdout и stderr файлами - и читать содержимое файлов вместо чтения из взаимоблокировки PIPE
, Эти файлы могут быть tempfile.NamedTemporaryFile()
- которые также могут быть доступны для чтения, пока они записываются subprocess.communicate
,
Ниже приведен пример использования:
try:
with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
И это исходный код, который можно использовать с как можно большим количеством комментариев, чтобы объяснить, что он делает:
Если вы используете Python 2, убедитесь, что сначала установили последнюю версию пакета subprocess32 из pypi.
import os
import sys
import threading
import time
import tempfile
if os.name == 'posix' and sys.version_info[0] < 3:
# Support python 2
import subprocess32 as subprocess
else:
# Get latest and greatest from python 3
import subprocess
class ProcessError(Exception):
"""Base exception for errors related to running the process"""
class ProcessTimeout(ProcessError):
"""Error that will be raised when the process execution will exceed a timeout"""
class ProcessRunner(object):
def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
"""
Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
Process Runner. This is a class that should be used as a context manager - and that provides an iterator
for reading captured output from subprocess.communicate in near realtime.
Example usage:
try:
with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
:param args: same as subprocess.Popen
:param env: same as subprocess.Popen
:param timeout: same as subprocess.communicate
:param bufsize: same as subprocess.Popen
:param seconds_to_wait: time to wait between each readline from the temporary file
:param kwargs: same as subprocess.Popen
"""
self._seconds_to_wait = seconds_to_wait
self._process_has_timed_out = False
self._timeout = timeout
self._process_done = False
self._std_file_handle = tempfile.NamedTemporaryFile()
self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
self._thread = threading.Thread(target=self._run_process)
self._thread.daemon = True
def __enter__(self):
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._thread.join()
self._std_file_handle.close()
def __iter__(self):
# read all output from stdout file that subprocess.communicate fills
with open(self._std_file_handle.name, 'r') as stdout:
out = stdout.readline()
# If we've reached the sentinel, then it means that we're done with yielding data
while not self._process_done:
out_without_trailing_whitespaces = out.rstrip()
if out_without_trailing_whitespaces:
# yield stdout data without trailing \n
yield out_without_trailing_whitespaces
else:
# if there is nothing to read, then please wait a tiny little bit
time.sleep(self._seconds_to_wait)
# continue reading as long as there is still data coming from stdout
out = stdout.readline()
if self._process_has_timed_out:
raise ProcessTimeout('Process has timed out')
if self._process.returncode != 0:
raise ProcessError('Process has failed')
def _run_process(self):
try:
# Start gathering information (stdout and stderr) from the opened process
self._process.communicate(timeout=self._timeout)
# Graceful termination of the opened process
self._process.terminate()
except subprocess.TimeoutExpired:
self._process_has_timed_out = True
# Force termination of the opened process
self._process.kill()
self._process_done = True
@property
def return_code(self):
return self._process.returncode
Похоже, что выход с буферизацией строки будет работать для вас, в этом случае может подойти что-то вроде следующего. (Предостережение: это не проверено.) Это даст только стандартный вывод подпроцесса в режиме реального времени. Если вы хотите использовать stderr и stdout в режиме реального времени, вам придется сделать что-то более сложное с select
,
proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
line = proc.stdout.readline()
print line
log_file.write(line + '\n')
# Might still be data on stdout at this point. Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
print line
log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...
Имел ту же проблему и разработал простое и чистое решение, используя
process.sdtout.read1()
который отлично подходит для моих нужд в python3.
Вот демонстрация с использованием
ping
команда (требуется подключение к интернету):
from subprocess import Popen, PIPE
cmd = "ping 8.8.8.8"
proc = Popen([cmd], shell=True, stdout=PIPE)
while True:
print(proc.stdout.read1())
Каждую секунду или около того в консоли Python печатается новая строка, поскольку команда ping сообщает свои данные в режиме реального времени.
Вот класс, который я использую в одном из моих проектов. Он перенаправляет вывод подпроцесса в журнал. Сначала я попытался просто переписать метод write, но это не работает, поскольку подпроцесс никогда не вызовет его (перенаправление происходит на уровне файлового дескриптора). Поэтому я использую свой собственный канал, аналогичный тому, как это делается в подпроцесс-модуле. Преимущество заключается в инкапсуляции всей логики логирования / печати в адаптере, и вы можете просто передать экземпляры регистратора в Popen
: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))
class LogAdapter(threading.Thread):
def __init__(self, logname, level = logging.INFO):
super().__init__()
self.log = logging.getLogger(logname)
self.readpipe, self.writepipe = os.pipe()
logFunctions = {
logging.DEBUG: self.log.debug,
logging.INFO: self.log.info,
logging.WARN: self.log.warn,
logging.ERROR: self.log.warn,
}
try:
self.logFunction = logFunctions[level]
except KeyError:
self.logFunction = self.log.info
def fileno(self):
#when fileno is called this indicates the subprocess is about to fork => start thread
self.start()
return self.writepipe
def finished(self):
"""If the write-filedescriptor is not closed this thread will
prevent the whole program from exiting. You can use this method
to clean up after the subprocess has terminated."""
os.close(self.writepipe)
def run(self):
inputFile = os.fdopen(self.readpipe)
while True:
line = inputFile.readline()
if len(line) == 0:
#no new data was added
break
self.logFunction(line.strip())
Если вам не нужна регистрация, а просто хотите использовать print()
очевидно, вы можете удалить большие части кода и сделать класс короче. Вы также можете расширить его на __enter__
а также __exit__
метод и вызов finished
в __exit__
так что вы можете легко использовать его в качестве контекста.
import os
def execute(cmd, callback):
for line in iter(os.popen(cmd).readline, ''):
callback(line[:-1])
execute('ls -a', print)
Обработка потока вывода команды в реальном времени может быть достигнута путем итерацииstdout
как subprocess.Popen
бежит.
Эта реализация:
- использует оператор with, так что стандартные файловые дескрипторы закрываются и процесс ожидает
- передает аргументы ключевого слова в конструктор подпроцесса
- по умолчанию
text=True
для автоматического декодирования байтовых строк в строки - поднимает вопрос
CalledProcessError
в случае неудачи, еслиcheck=True
нравитьсяделает - возвращает
CompletedProcess
в случае успеха, какsubprocess.run
делает - использует два потока для одновременной обработки stdout и stderr (версию, перенаправляющую stdout на stderr без потоков, см. в моем упрощенном ответе )
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
with ThreadPoolExecutor(2) as pool: # two threads to handle the streams
exhaust = partial(pool.submit, partial(deque, maxlen=0))
exhaust(stdout_handler(line[:-1]) for line in process.stdout)
exhaust(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
Регистрация в файле становится такой же простой, как и настройка. logging
:
logging.basicConfig(
level=logging.INFO,
filename="./capture.log",
filemode="w",
encoding="utf-8",
)
logging.info("test from python")
stream_command(["echo", "test from subprocess"])
С полученным файлом:
$ cat ./capture.log
INFO:root:test from python
INFO:root:test from subprocess
Поведение можно настроить по своему усмотрению (print
вместоlogging.info
или и то, и другое и т. д.):
stream_command(["echo", "test"])
# INFO:root:test
stream_command("cat ./nonexist", shell=True, check=False)
# ERROR:root:cat: ./nonexist: No such file or directory
stream_command(["echo", "test"], stdout_handler=print)
# test
stdout_lines = []
def handler(line):
print(line)
logging.info(line)
stdout_lines.append(line)
stream_command(["echo", "test"], stdout_handler=handler)
# test
# INFO:root:test
print(stdout_lines)
# ['test']
Это старый пост, но в Python 3 , протестированном в Python 3.11 , у меня работал следующий код для потоковой передачи вывода в реальном времени или в реальном времени с использованиемsubprocess
модуль:
import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT
with Popen(command,
shell=True,
stdout=PIPE,
stderr=STDOUT) as sp:
with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:
for line in sp.stdout:
stdout.write(line)
stdout.flush()
Удобная функция
Поскольку это идиоматично, я обычно создаю удобную функциюrun
для связывания списка команд в терминале и потоковой передачи вывода в режиме реального времени.
Обратите внимание, что я использую&&
в качестве разделителя здесь, но вы можете легко использовать другой, например;
если вы не хотите потерпеть неудачу из-за ошибки или даже&
вместо.
import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT
def run(cmds, join='&&'):
with Popen(join.join(cmds),
shell=True,
stdout=PIPE,
stderr=STDOUT) as sp:
with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:
for line in sp.stdout:
stdout.write(line)
stdout.flush()
Использование такое:
commands = [
'echo hello',
'sleep 3',
'echo world',
'sleep 2',
'echo !',
]
run(commands)
На мой взгляд, «живой вывод из команды подпроцесса» означает, что и stdout, и stderr должны быть активными. И stdin также должен быть доставлен в подпроцесс.
Фрагмент ниже производит оперативный вывод на stdout и stderr, а также фиксирует их как байты в output.{stdout,stderr}.
Хитрость заключается в правильном использовании select и poll.
У меня хорошо работает на Python 3.9.
if self.log == 1:
print(f"** cmnd= {fullCmndStr}")
self.outcome.stdcmnd = fullCmndStr
try:
process = subprocess.Popen(
fullCmndStr,
shell=True,
encoding='utf8',
executable="/bin/bash",
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except OSError:
self.outcome.error = OSError
else:
process.stdin.write(stdin)
process.stdin.close() # type: ignore
stdoutStrFile = io.StringIO("")
stderrStrFile = io.StringIO("")
pollStdout = select.poll()
pollStderr = select.poll()
pollStdout.register(process.stdout, select.POLLIN)
pollStderr.register(process.stderr, select.POLLIN)
stdoutEOF = False
stderrEOF = False
while True:
stdoutActivity = pollStdout.poll(0)
if stdoutActivity:
c= process.stdout.read(1)
if c:
stdoutStrFile.write(c)
if self.log == 1:
sys.stdout.write(c)
else:
stdoutEOF = True
stderrActivity = pollStderr.poll(0)
if stderrActivity:
c= process.stderr.read(1)
if c:
stderrStrFile.write(c)
if self.log == 1:
sys.stderr.write(c)
else:
stderrEOF = True
if stdoutEOF and stderrEOF:
break
if self.log == 1:
print(f"** cmnd={fullCmndStr}")
process.wait() # type: ignore
self.outcome.stdout = stdoutStrFile.getvalue()
self.outcome.stderr = stderrStrFile.getvalue()
self.outcome.error = process.returncode # type: ignore
Единственный способ, которым я нашел, как читать вывод подпроцесса в потоковом режиме (а также записывать его в переменную) в Python (для нескольких выходных потоков, т.е.stdout
иstderr
) заключается в передаче подпроцессу именованного временного файла для записи, а затем открытии того же временного файла в отдельном дескрипторе чтения.
Примечание: это для Python 3
stdout_write = tempfile.NamedTemporaryFile()
stdout_read = io.open(stdout_write.name, "r")
stderr_write = tempfile.NamedTemporaryFile()
stderr_read = io.open(stderr_write.name, "r")
stdout_captured = ""
stderr_captured = ""
proc = subprocess.Popen(["command"], stdout=stdout_write, stderr=stderr_write)
while True:
proc_done: bool = cli_process.poll() is not None
while True:
content = stdout_read.read(1024)
sys.stdout.write(content)
stdout_captured += content
if len(content) < 1024:
break
while True:
content = stderr_read.read(1024)
sys.stderr.write(content)
stdout_captured += content
if len(content) < 1024:
break
if proc_done:
break
time.sleep(0.1)
stdout_write.close()
stdout_read.close()
stderr_write.close()
stderr_read.close()
Однако, если вам не нужно захватывать вывод, вы можете просто передатьsys.stdout
иsys.stderr
потоки из вашего скрипта Python в вызываемый подпроцесс, как предложил xaav в своем ответе :
subprocess.Popen(["command"], stdout=sys.stdout, stderr=sys.stderr)
У меня работает (Python 3.12):
import subprocess
from time import sleep
def run_shell(command):
def printShellOut(process):
out = process.stdout.readline()
if out:
print(out.decode(), flush = True, end='')
return True
def printShellErr(process):
err = process.stderr.readline()
if err:
print('Error: ' + err.decode(), flush = True, end='')
return True
def printShell(process):
while printShellOut(process):
pass
while printShellErr(process):
pass
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while process.poll() is None:
printShell(process)
sleep(0.5)
printShell(process)
Я пойду на конечности здесь и предложу использовать check_call
, Согласно документации, он запускает операцию блокировки и вполне подходит для этой цели.
Запустите команду с аргументами. Дождитесь завершения команды. Если код возврата был равен нулю, вернуть, в противном случае вызовите CalledProcessError. Объект CalledProcessError будет иметь код возврата в атрибуте returncode.
cmd = "some_crazy_long_script.sh"
args = {
'shell': True,
'cwd': if_you_need_it
}
subprocess.check_call(cmd, **args)
import sys
import subprocess
f = open("log.txt","w+")
process = subprocess.Popen(command, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''):
sys.stdout.write(line)
f.write(line.replace("\n",""))
f.close()