Subprocess.Popen: клонирование stdout и stderr как для терминала, так и для переменных
Можно ли изменить код ниже, чтобы иметь распечатку из 'stdout' и 'stderr':
- распечатывается на терминале (в режиме реального времени),
- и, наконец, хранятся в переменные аутов и ошибок?
Код:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import subprocess
def run_cmd(command, cwd=None):
p = subprocess.Popen(command, cwd=cwd, shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = p.communicate()
rc = p.returncode
outs = outs.decode('utf-8')
errs = errs.decode('utf-8')
return (rc, (outs, errs))
Спасибо @unutbu, отдельное спасибо за @jf-sebastian, финальная функция:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
def read_output(pipe, funcs):
for line in iter(pipe.readline, b''):
for func in funcs:
func(line.decode('utf-8'))
pipe.close()
def write_output(get):
for line in iter(get, None):
sys.stdout.write(line)
def run_cmd(command, cwd=None, passthrough=True):
outs, errs = None, None
proc = Popen(
command,
cwd=cwd,
shell=False,
close_fds=True,
stdout=PIPE,
stderr=PIPE,
bufsize=1
)
if passthrough:
outs, errs = [], []
q = Queue()
stdout_thread = Thread(
target=read_output, args=(proc.stdout, [q.put, outs.append])
)
stderr_thread = Thread(
target=read_output, args=(proc.stderr, [q.put, errs.append])
)
writer_thread = Thread(
target=write_output, args=(q.get,)
)
for t in (stdout_thread, stderr_thread, writer_thread):
t.daemon = True
t.start()
proc.wait()
for t in (stdout_thread, stderr_thread):
t.join()
q.put(None)
outs = ' '.join(outs)
errs = ' '.join(errs)
else:
outs, errs = proc.communicate()
outs = '' if outs == None else outs.decode('utf-8')
errs = '' if errs == None else errs.decode('utf-8')
rc = proc.returncode
return (rc, (outs, errs))
4 ответа
Вы можете создавать потоки для чтения каналов stdout и stderr, записи в общую очередь и добавления в списки. Затем используйте третий поток для печати элементов из очереди.
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def read_output(pipe, funcs):
for line in iter(pipe.readline, ''):
for func in funcs:
func(line)
# time.sleep(1)
pipe.close()
def write_output(get):
for line in iter(get, None):
sys.stdout.write(line)
process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
t.daemon = True
t.start()
process.wait()
for t in (tout, terr):
t.join()
q.put(None)
print(out)
print(err)
Причина использования третьего потока - вместо того, чтобы первые два потока печатали непосредственно на терминале - состоит в том, чтобы предотвратить одновременное выполнение обоих операторов печати, что может привести к искажению текста.
Вышеуказанные звонки random_print.py
, который печатает в stdout и stderr в произвольном порядке:
import sys
import time
import random
for i in range(50):
f = random.choice([sys.stdout,sys.stderr])
f.write(str(i)+'\n')
f.flush()
time.sleep(0.1)
Это решение заимствует код и идеи от JF Sebastian, здесь.
Вот альтернативное решение для Unix-подобных систем, использующее select.select
:
import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def make_async(fd):
# https://stackru.com/a/7730201/190597
'''add the O_NONBLOCK flag to a file descriptor'''
fcntl.fcntl(
fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
def read_async(fd):
# https://stackru.com/a/7730201/190597
'''read some data from a file descriptor, ignoring EAGAIN errors'''
# time.sleep(1)
try:
return fd.read()
except IOError, e:
if e.errno != errno.EAGAIN:
raise e
else:
return ''
def write_output(fds, outmap):
for fd in fds:
line = read_async(fd)
sys.stdout.write(line)
outmap[fd.fileno()].append(line)
process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)
make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
write_output(rlist, outmap)
if process.poll() is not None:
write_output([process.stdout, process.stderr], outmap)
break
fileno = {'stdout': process.stdout.fileno(),
'stderr': process.stderr.fileno()}
print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])
Это решение использует код и идеи из поста Адама Розенфилда, здесь.
Чтобы захватывать и отображать одновременно stdout и stderr из дочернего процесса построчно в одном потоке, вы можете использовать асинхронный ввод-вывод:
#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE
@asyncio.coroutine
def read_stream_and_display(stream, display):
"""Read from stream line by line until EOF, display, and capture the lines.
"""
output = []
while True:
line = yield from stream.readline()
if not line:
break
output.append(line)
display(line) # assume it doesn't block
return b''.join(output)
@asyncio.coroutine
def read_and_display(*cmd):
"""Capture cmd's stdout, stderr while displaying them as they arrive
(line by line).
"""
# start process
process = yield from asyncio.create_subprocess_exec(*cmd,
stdout=PIPE, stderr=PIPE)
# read child's stdout/stderr concurrently (capture and display)
try:
stdout, stderr = yield from asyncio.gather(
read_stream_and_display(process.stdout, sys.stdout.buffer.write),
read_stream_and_display(process.stderr, sys.stderr.buffer.write))
except Exception:
process.kill()
raise
finally:
# wait for the process to exit
rc = yield from process.wait()
return rc, stdout, stderr
# run the event loop
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()
Для потоковой передачи вывода в реальном времени (stdout и stderr) подпроцесса на терминал, а также в переменные, вы можете создать два потока для одновременной обработки потоков.
Адаптировано из моего более подробного ответа :
import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
args,
*,
stdout_handler=logging.info,
stderr_handler=logging.error,
check=True,
text=True,
stdout=PIPE,
stderr=PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
with ThreadPoolExecutor(2) as pool: # two threads to handle the streams
exhaust = partial(pool.submit, partial(deque, maxlen=0))
exhaust(stdout_handler(line[:-1]) for line in process.stdout)
exhaust(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args)
return CompletedProcess(process.args, retcode)
Вызов с пользовательскими обработчиками:
outs, errs = [], []
def stdout_handler(line):
outs.append(line)
print(line)
def stderr_handler(line):
errs.append(line)
print(line)
stream_command(
["echo", "test"],
stdout_handler=stdout_handler,
stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']
Вот еще одна версия для вашегоrun_cmd
функция с использованиемasyncio
в Python 3.11:
import asyncio
import io
import sys
from subprocess import SubprocessError
# Maximum number of bytes to read at once from the 'asyncio.subprocess.PIPE'
_MAX_BUFFER_CHUNK_SIZE = 1024
async def run_cmd_async(command, cwd=None, check=False):
stdout_buffer = io.BytesIO()
stderr_buffer = io.BytesIO()
process = await asyncio.subprocess.create_subprocess_exec(
*command,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
async def write_stdout() -> None:
assert process.stdout is not None
while chunk := await process.stdout.read(_MAX_BUFFER_CHUNK_SIZE):
stdout_buffer.write(chunk)
print(chunk.decode(), end="", flush=True)
async def write_stderr() -> None:
assert process.stderr is not None
while chunk := await process.stderr.read(_MAX_BUFFER_CHUNK_SIZE):
stderr_buffer.write(chunk)
print(chunk.decode(), file=sys.stderr, end="", flush=True)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(write_stdout())
task_group.create_task(write_stderr())
exit_code = await process.wait()
if check and exit_code != 0:
raise SubprocessError(
f"Command '{command}' returned non-zero exit status {exit_code}."
)
return exit_code, (stdout_buffer.getvalue().decode(),
stderr_buffer.getvalue().decode())
def run_cmd(command, cwd=None, check=False):
return asyncio.run(run_cmd_async(command, cwd=cwd, check=check))