Неблокирующее чтение на подпроцесс.PIPE в Python
Я использую модуль подпроцесса, чтобы запустить подпроцесс и подключиться к его выходному потоку (stdout). Я хочу иметь возможность выполнять неблокирующие чтения на своем стандартном выводе. Есть ли способ сделать.readline неблокирующим или проверить, есть ли данные в потоке, прежде чем я вызову .readline
? Я хотел бы, чтобы это было переносимо или, по крайней мере, работало под Windows и Linux.
вот как я делаю это сейчас (это блокирует на .readline
если нет данных):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
32 ответа
fcntl
, select
, asyncproc
не поможет в этом случае.
Надежный способ чтения потока без блокировки независимо от операционной системы заключается в использовании Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
У меня часто была похожая проблема; Программы на Python, которые я пишу часто, должны иметь возможность выполнять некоторые основные функции, одновременно принимая пользовательский ввод из командной строки (stdin). Простое помещение функции обработки пользовательского ввода в другой поток не решает проблему, потому что readline()
блокирует и не имеет времени ожидания. Если основная функциональность завершена и больше нет необходимости ждать дальнейшего ввода данных пользователем, я обычно хочу, чтобы моя программа завершала работу, но это не может быть, потому что readline()
все еще блокируется в другом потоке в ожидании строки. Решение, которое я нашел для этой проблемы, состоит в том, чтобы сделать stdin неблокирующим файлом с помощью модуля fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
На мой взгляд, это немного чище, чем использование модулей select или signal для решения этой проблемы, но опять же, это работает только в UNIX...
В Unix-подобных системах и Python 3.5+ есть os.set_blocking
который делает именно то, что говорит.
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
Это выводит:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
С os.set_blocking
прокомментировал это:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Python 3.4 представляет новый временный API для асинхронного ввода-вывода - asyncio
модуль.
Подход похож на twisted
на основе ответа @Bryan Ward - определите протокол и его методы вызываются, как только данные готовы:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
См. "Подпроцесс" в документации.
Есть интерфейс высокого уровня asyncio.create_subprocess_exec()
это возвращает Process
объекты, позволяющие асинхронно читать строки, используя StreamReader.readline()
сопрограмма (с async
/ await
Синтаксис Python 3.5+):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
выполняет следующие задачи:
- запустить подпроцесс, перенаправить его стандартный поток в канал
- читать строку из stdout подпроцесса асинхронно
- убить подпроцесс
- дождитесь его выхода
Каждый шаг может быть ограничен тайм-аутом секунд, если это необходимо.
Попробуйте модуль asyncproc. Например:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Модуль заботится обо всех потоках, как это было предложено S.Lott.
Вы можете сделать это очень легко в Twisted. В зависимости от вашей существующей кодовой базы это может быть не так просто использовать, но если вы создаете скрученное приложение, такие вещи становятся почти тривиальными. Вы создаете ProcessProtocol
класс и переопределить outReceived()
метод. Скрученный (в зависимости от используемого реактора) обычно просто большой select()
цикл с установленными обратными вызовами для обработки данных из разных файловых дескрипторов (часто сетевых сокетов). Итак outReceived()
Метод просто устанавливает обратный вызов для обработки данных, поступающих из STDOUT
, Простой пример, демонстрирующий это поведение:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
В документации Twisted есть хорошая информация об этом.
Если вы строите свое приложение целиком на Twisted, оно делает асинхронную связь с другими процессами, локальными или удаленными, действительно элегантной. С другой стороны, если ваша программа не построена поверх Twisted, это не очень полезно. Надеемся, что это может быть полезно для других читателей, даже если это не применимо для вашего конкретного приложения.
Используйте select & read(1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Для readline()- как:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
В современном Python дела обстоят намного лучше.
Вот простая дочерняя программа hello.py:
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
И программа для взаимодействия с ним:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
Это распечатывает:
b'hello bob\n'
b'hello alice\n'
Обратите внимание, что фактический шаблон, который также присутствует почти во всех предыдущих ответах, как здесь, так и в связанных вопросах, заключается в том, чтобы установить дескриптор файла stdout дочернего элемента на неблокирующий, а затем опросить его в некотором цикле выбора. В наши дни, конечно, этот цикл обеспечивается asyncio.
Вот простое решение, основанное на потоках, которые:
- работает как в Linux, так и в Windows (не полагаясь на
select
). - читает оба
stdout
а такжеstderr
асинхронно. - не полагается на активный опрос с произвольным временем ожидания (удобен для ЦП).
- не использует
asyncio
(что может конфликтовать с другими библиотеками). - выполняется до завершения дочернего процесса.
printer.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
Вот мой код, используемый для перехвата всех выходных данных подпроцесса ASAP, включая частичные строки. Он качает одновременно и stdout и stderr в почти правильном порядке.
Протестировано и корректно работает на Python 2.7 linux & windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Одним из решений является создание другого процесса для выполнения чтения процесса или создание потока процесса с тайм-аутом.
Вот потоковая версия функции тайм-аута:
http://code.activestate.com/recipes/473878/
Тем не менее, вам нужно прочитать стандартный вывод по мере его поступления? Другое решение может заключаться в выводе выходных данных в файл и ожидании завершения процесса с использованием p.wait ().
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Существующие решения не работают для меня (подробности ниже). Наконец, работало, чтобы реализовать readline с использованием read(1) (основываясь на этом ответе). Последний не блокирует:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Почему существующие решения не работают:
- Решения, которые требуют readline (включая решения на основе очереди), всегда блокируются. Трудно (невозможно?) Убить поток, который выполняет readline. Он будет уничтожен только после завершения процесса, который его создал, но не когда процесс, производящий вывод, будет уничтожен.
- Смешивание низкоуровневого fcntl с высокоуровневыми вызовами readline может не работать должным образом, как указал anonnn.
- Использование select.poll() удобно, но не работает в Windows в соответствии с документами Python.
- Использование сторонних библиотек кажется излишним для этой задачи и добавляет дополнительные зависимости.
Отказ от ответственности: это работает только для торнадо
Вы можете сделать это, установив fd как неблокирующее, а затем использовать ioloop для регистрации обратных вызовов. Я упаковал это в яйцо под названием tornado_subprocess, и вы можете установить его через PyPI:
easy_install tornado_subprocess
Теперь вы можете сделать что-то вроде этого:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Вы также можете использовать его с RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Я добавляю эту проблему для чтения некоторого подпроцесса. Откройте стандартный вывод. Вот мое не блокирующее решение для чтения:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Не первый и, вероятно, не последний, я создал пакет, который выполняет неблокирующее чтение PIPE stdout двумя разными методами, один из которых основан на ответе JF Sebastian (@jfs), а другой - простое сообщение ( ) цикл с потоком для проверки тайм-аутов.
Оба метода захвата stdout протестированы для работы как в Linux, так и в Windows, с версиями Python от 2.7 до 3.9 на момент написания.
Будучи неблокирующим, он гарантирует соблюдение тайм-аута даже при наличии нескольких дочерних и внучатых процессов и даже в Python 2.7.
Пакет также обрабатывает как байтовые, так и текстовые кодировки stdout, что является кошмаром при попытке поймать EOF.
Вы найдете пакет на https://github.com/netinvent/command_runner .
Если вам нужны хорошо протестированные неблокирующие реализации чтения, попробуйте (или взломайте код):
pip install command_runner
from command_runner import command_runner
exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')
Вы можете найти основной неблокирующий код чтения в
Эта версия неблокирующего чтения не требует специальных модулей и будет работать "из коробки" на большинстве дистрибутивов Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
У меня есть проблема с оригинальным вопросником, но я не хотел вызывать темы. Я смешал решение Джесси с прямым read() из канала и собственным обработчиком буфера для чтения строк (однако, мой подпроцесс - ping - всегда записывал полные строки <размер системной страницы). Я избегаю ожидания, занятого только чтением в часах, зарегистрированных в gobject. В эти дни я обычно запускаю код в mainLoop gobject, чтобы избежать потоков.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
Наблюдатель
def watch(f, *other):
print 'reading',f.read()
return True
И основная программа устанавливает пинг, а затем вызывает gobject mail loop.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Любая другая работа привязана к обратным вызовам в gobject.
Добавление этого ответа здесь, поскольку он предоставляет возможность устанавливать неблокирующие каналы в Windows и Unix.
Все ctypes
подробности благодаря ответу @ techtonik.
Существует немного измененная версия, которая будет использоваться как в системах Unix, так и в Windows.
- Совместим с Python3 (требуется лишь незначительное изменение).
- Включает в себя версию posix и определяет исключение для использования для любого из них.
Таким образом, вы можете использовать ту же функцию и исключение для кода Unix и Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackru.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Чтобы избежать чтения неполных данных, я написал собственный генератор readline (который возвращает строку байтов для каждой строки).
Это генератор, так что вы можете, например...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Модуль выбора помогает определить, где находится следующий полезный ввод.
Тем не менее, вы почти всегда счастливее с отдельными темами. Один выполняет блокировку чтения стандартного ввода, другой - везде, где вы не хотите блокировать.
В моем случае мне понадобился модуль регистрации, который ловит выходные данные из фоновых приложений и увеличивает их (добавляя метки времени, цвета и т. Д.).
Я закончил с фоновым потоком, который делает фактический ввод / вывод. Следующий код предназначен только для платформ POSIX. Я снял ненужные детали.
Если кто-то собирается использовать этого зверя на долгое время, подумайте об управлении открытыми дескрипторами. В моем случае это не было большой проблемой.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Зачем беспокоить поток и очередь? в отличие от readline(), BufferedReader.read1() не блокирует ожидание \r\n, он возвращает ASAP, если есть какие-либо выходные данные.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
Это пример запуска интерактивной команды в подпроцессе, и стандартный вывод является интерактивным с использованием псевдотерминала. Вы можете обратиться к: /questions/18469654/zapustite-interaktivnyij-bash-s-popen-i-vyidelennyim-tty-python/18469676#18469676
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Моя проблема немного отличается, так как я хотел собрать как stdout, так и stderr из запущенного процесса, но, в конечном счете, то же самое, поскольку я хотел отобразить вывод в виджете как сгенерированный.
Я не хотел прибегать ко многим из предложенных обходных путей с использованием очередей или дополнительных потоков, поскольку они не должны быть необходимы для выполнения такой распространенной задачи, как запуск другого сценария и сбор его выходных данных.
Прочитав предложенные решения и документы по Python, я решил свою проблему с помощью реализации ниже. Да, это работает только для POSIX, так как я использую select
вызов функции.
Я согласен с тем, что документы сбивают с толку, и реализация такой неудобной задачи сценария неудобна. Я считаю, что более старые версии Python имеют разные значения по умолчанию для Popen
и разные объяснения, которые создали много путаницы. Кажется, это хорошо работает как для Python 2.7.12, так и для 3.5.2.
Ключ должен был установить bufsize=1
для буферизации строки, а затем universal_newlines=True
обрабатывать как текстовый файл вместо двоичного файла, который, по-видимому, становится значением по умолчанию при настройке bufsize=1
,
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG и VERBOSE - это просто макросы, которые печатают вывод на терминал.
Это решение ИМХО эффективно на 99,99%, так как оно все еще использует блокировку readline
функция, поэтому мы предполагаем, что подпроцесс хорош и выводит полные строки.
Я приветствую отзывы, чтобы улучшить решение, так как я все еще новичок в Python.
Основываясь на ответе Дж. Ф. Себастьяна и нескольких других источниках, я собрал простой менеджер подпроцессов. Он обеспечивает запрос неблокируемого чтения, а также запускает несколько процессов параллельно. Он не использует специфичный для ОС вызов (который я знаю) и, следовательно, должен работать где угодно.
Это доступно из pypi, так что просто pip install shelljob
, Обратитесь к странице проекта для примеров и полных документов.
Я создал библиотеку на основе решения Дж. Ф. Себастьяна. Вы можете использовать это.
Я также столкнулся с проблемой, описанной Jesse, и решил ее с помощью "выбора", как это сделали Bradley Odell, Andy Jackson и другие, но в режиме блокировки, чтобы избежать петли занятости. Он использует фиктивную трубу как фальшивый стандарт. Блоки выбора и ждут, когда будет готов либо стандартный ввод, либо канал. Когда клавиша нажата, stdin разблокирует выбор, и значение клавиши можно получить с помощью read(1). Когда другой поток записывает в канал, канал разблокирует выбор, и это может быть воспринято как указание на то, что потребность в stdin закончена. Вот некоторый ссылочный код:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Недавно я наткнулся на ту же проблему, мне нужно прочитать одну строку за раз из потока (хвостовой запуск в подпроцессе) в неблокирующем режиме. Я хотел избежать следующих проблем: не записывать процессор, не читать поток одним байтом (как readline сделал) и т. д.
Вот моя реализация https://gist.github.com/grubberr/5501e1a9760c3eab5e0a она не поддерживает Windows (опрос), не обрабатывает EOF, но она хорошо работает для меня
Это решение использует select
модуль для "чтения любых доступных данных" из потока ввода-вывода. Эта функция первоначально блокируется до тех пор, пока данные не станут доступны, но затем считывает только те данные, которые доступны и не блокируются в дальнейшем.
Учитывая тот факт, что он использует select
модуль, это работает только на Unix.
Код полностью PEP8-совместимый.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
РЕДАКТИРОВАТЬ: эта реализация по-прежнему блокирует. Вместо этого используйте ответ JFSebastian.
Я попробовал лучший ответ, но дополнительный риск и обслуживание кода потока вызывали беспокойство.
Просматривая модуль io(и ограничиваясь 2.6), я нашел BufferedReader. Это моё безрезультатное, неблокирующее решение.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line