Как мне прочитать выходные данные дочернего процесса без блокировки в Rust?
Я делаю небольшое приложение ncurses в Rust, которое должно взаимодействовать с дочерним процессом. У меня уже есть прототип, написанный на Common Lisp; Надеюсь, что GIF покажет, что я хочу сделать. Я пытаюсь переписать это, потому что CL использует огромное количество памяти для такого маленького инструмента.
Я раньше не использовал Rust (или другие языки низкого уровня), и у меня возникли проблемы с выяснением того, как взаимодействовать с подпроцессом.
То, что я сейчас делаю, примерно так:
Создайте процесс:
let mut program = match Command::new(command) .args(arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(child) => child, Err(_) => { println!("Cannot run program '{}'.", command); return; }, };
Передайте его в бесконечный (до тех пор, пока пользователь не выйдет) цикл, который читает и обрабатывает ввод и прослушивает вывод, подобный этому (и записывает его на экран):
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout { Some(ref mut out) => { let mut buf_string = String::new(); match out.read_to_string(&mut buf_string) { Ok(_) => output_viewer.append_string(buf_string), Err(_) => return, }; }, None => return, }; }
Призыв к read_to_string
однако блокирует программу до завершения процесса. Из того, что я вижу read_to_end
а также read
также, кажется, заблокировать. Если я попытаюсь запустить что-то вроде ls
который выходит сразу, это работает, но с чем-то, что не выходит, как python
или же sbcl
это продолжается только после того, как я вручную уничтожаю подпроцесс.
Редактировать:
Основываясь на этом ответе, я изменил код для использования BufReader
:
fn listen_for_output(program: &mut Child,
output_viewer: &TextViewer) {
match program.stdout.as_mut() {
Some(out) => {
let buf_reader = BufReader::new(out);
for line in buf_reader.lines() {
match line {
Ok(l) => {
output_viewer.append_string(l);
},
Err(_) => return,
};
}
},
None => return,
}
}
Однако проблема остается прежней. Он будет читать все доступные строки, а затем блокировать. Поскольку инструмент должен работать с любой программой, нет способа угадать, когда закончится вывод, прежде чем пытаться читать. Кажется, не существует способа установить время ожидания для BufReader
или.
4 ответа
Потоки блокируются по умолчанию. Потоки TCP/IP, потоки файловой системы, потоки каналов - все они блокируются. Когда вы указываете потоку дать вам кусок байтов, он остановится и будет ждать, пока у него не будет заданного количества байтов или пока что-то еще не произойдет ( прерывание, конец потока, ошибка).
Операционные системы стремятся вернуть данные в процесс чтения, поэтому, если все, что вам нужно, это дождаться следующей строки и обработать ее, как только она придет, тогда метод, предложенный Шепмастером в " Невозможно передать по трубопроводу или из порожденного потомка" процесс не раз работает. (Теоретически это не обязательно, потому что операционной системе разрешено BufReader
ждать больше данных в read
, но на практике операционные системы предпочитают ранние "короткие чтения" ожиданиям.
Это просто BufReader
подход перестает работать, когда вам нужно обрабатывать несколько потоков (например, stdout
а также stderr
дочернего процесса) или несколько процессов. Например, BufReader
на основе подхода может зайти в тупик, когда дочерний процесс ожидает, когда вы истощите его stderr
труба в то время как ваш процесс заблокирован в ожидании пустого stdout
,
Точно так же вы не можете использовать BufReader
когда вы не хотите, чтобы ваша программа ждала дочерний процесс бесконечно долго. Может быть, вы хотите отобразить индикатор выполнения или таймер, пока ребенок все еще работает и не дает никакого вывода.
Вы не можете использовать BufReader
основанный на подходе, если ваша операционная система не стремится вернуть данные процессу (предпочитает "полное чтение", а не "короткое чтение"), потому что в этом случае несколько последних строк, напечатанных дочерним процессом, могут оказаться серыми зона: операционная система получила их, но они недостаточно велики, чтобы заполнить BufReader
Буфер.
BufReader
ограничивается тем, что Read
Интерфейс позволяет делать это с потоком, он не менее блокирует, чем основной поток. Чтобы быть эффективными, он будет читать входные данные порциями, сообщая операционной системе, чтобы заполнить столько своего буфера, сколько он имеет в наличии.
Вы можете быть удивлены, почему чтение данных в блоках так важно здесь, почему BufReader
просто прочитайте данные побайтно. Проблема в том, что для чтения данных из потока нам нужна помощь операционной системы. С другой стороны, мы не операционная система, мы работаем изолированно от нее, чтобы не связываться с ней, если что-то пойдет не так с нашим процессом. Таким образом, для вызова операционной системы необходим переход в "режим ядра", что также может привести к "переключению контекста". Вот почему вызов операционной системы для чтения каждого байта обходится дорого. Нам нужно как можно меньше вызовов ОС, и мы получаем потоковые данные в пакетном режиме.
Чтобы ждать в потоке без блокировки, вам нужен неблокирующий поток. MIO обещает иметь необходимую неблокирующую поддержку потоков для каналов, скорее всего, с PipeReader, но я пока не проверял это.
Неблокирующая природа потока должна позволять читать данные порциями, независимо от того, предпочитает ли операционная система "короткие чтения" или нет. Потому что неблокирующий поток никогда не блокируется. Если в потоке нет данных, это просто говорит вам об этом.
В отсутствие неблокирующего потока вам придется прибегать к порождающим потокам, чтобы блокирующие чтения выполнялись в отдельном потоке и, таким образом, не блокировали ваш основной поток. Вы также можете прочитать поток за байтом, чтобы немедленно отреагировать на разделитель строк, если операционная система не предпочитает "короткие чтения". Вот рабочий пример: https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.
Токио-процесс
Вот пример использования tokio и tokio-process.
use std::{
io::BufReader,
process::{Command, Stdio},
};
use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18
use tokio_process::CommandExt; // 0.2.3
fn main() {
let mut cmd = Command::new("/tmp/slow.bash")
.stdout(Stdio::piped())
.spawn_async()
.expect("cannot spawn");
let stdout = cmd.stdout().take().expect("no stdout");
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
io::lines(BufReader::new(stdout))
.inspect(|s| println!("> {}", s))
.collect()
});
println!("All the lines: {:?}", result);
}
Tokio-Threadpool
Вот пример использования Tokio и Tokio-Threadpool. Мы запускаем процесс в потоке, используя blocking
функция. Мы конвертируем это в поток с stream::poll_fn
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn stream_command_output(
mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
// Ensure that the output is available to read from and start the process
let mut child = command
.stdout(Stdio::piped())
.spawn()
.expect("cannot spawn");
let mut stdout = child.stdout.take().expect("no stdout");
// Create a stream of data
stream::poll_fn(move || {
// Perform blocking IO
tokio_threadpool::blocking(|| {
// Allocate some space to store anything read
let mut data = vec![0; 128];
// Read 1-128 bytes of data
let n_bytes_read = stdout.read(&mut data).expect("cannot read");
if n_bytes_read == 0 {
// Stdout is done
None
} else {
// Only return as many bytes as we read
data.truncate(n_bytes_read);
Some(data)
}
})
})
}
fn main() {
let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
output_stream
.map(|d| String::from_utf8(d).expect("Not UTF-8"))
.fold(Vec::new(), |mut v, s| {
print!("> {}", s);
v.push(s);
Ok(v)
})
});
println!("All the lines: {:?}", result);
}
Есть множество возможных компромиссов, которые могут быть сделаны здесь. Например, всегда выделять 128 байтов не идеально, но это просто реализовать.
Служба поддержки
Для справки, здесь slow.bash:
#!/usr/bin/env bash
set -eu
val=0
while [[ $val -lt 10 ]]; do
echo $val
val=$(($val + 1))
sleep 1
done
Смотрите также:
Если поддержки Unix достаточно, вы также можете сделать два выходных потока неблокирующими и опрашивать их, как если бы вы делали это на
TcpStream
с
set_nonblocking
функция.
В
ChildStdout
и
ChildStderr
возвращенные командой spawn являются
Stdio
(и содержат дескриптор файла), вы можете напрямую изменить поведение чтения этого дескриптора, чтобы сделать его неблокирующим.
Основываясь на работе jcreekmore / timeout-readwrite-rs и anowell / nonblock-rs, я использую эту оболочку для изменения дескрипторов потока:
extern crate libc;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};
fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
where
H: Read + AsRawFd,
{
let fd = handle.as_raw_fd();
let flags = unsafe { fcntl(fd, F_GETFL, 0) };
if flags < 0 {
return Err(std::io::Error::last_os_error());
}
let flags = if nonblocking{
flags | O_NONBLOCK
} else {
flags & !O_NONBLOCK
};
let res = unsafe { fcntl(fd, F_SETFL, flags) };
if res != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
Вы можете управлять этими двумя потоками как любым другим неблокирующим потоком. Следующий пример основан на ящике опроса, который упрощает обработку события чтения и
BufReader
для чтения строки:
use std::process::{Command, Stdio};
use std::path::PathBuf;
use std::io::{BufReader, BufRead};
use std::thread;
extern crate polling;
use polling::{Event, Poller};
fn main() -> Result<(), std::io::Error> {
let path = PathBuf::from("./worker.sh").canonicalize()?;
let mut child = Command::new(path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to start worker");
let handle = thread::spawn({
let stdout = child.stdout.take().unwrap();
set_nonblocking(&stdout, true)?;
let mut reader_out = BufReader::new(stdout);
let stderr = child.stderr.take().unwrap();
set_nonblocking(&stderr, true)?;
let mut reader_err = BufReader::new(stderr);
move || {
let key_out = 1;
let key_err = 2;
let mut out_closed = false;
let mut err_closed = false;
let poller = Poller::new().unwrap();
poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();
let mut line = String::new();
let mut events = Vec::new();
loop {
// Wait for at least one I/O event.
events.clear();
poller.wait(&mut events, None).unwrap();
for ev in &events {
// stdout is ready for reading
if ev.key == key_out {
let len = match reader_out.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stdout read returned error: {}", e);
0
}
};
if len == 0 {
println!("stdout closed (len is null)");
out_closed = true;
poller.delete(reader_out.get_ref()).unwrap();
} else {
print!("[STDOUT] {}", line);
line.clear();
// reload the poller
poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
}
}
// stderr is ready for reading
if ev.key == key_err {
let len = match reader_err.read_line(&mut line) {
Ok(len) => len,
Err(e) => {
println!("stderr read returned error: {}", e);
0
}
};
if len == 0 {
println!("stderr closed (len is null)");
err_closed = true;
poller.delete(reader_err.get_ref()).unwrap();
} else {
print!("[STDERR] {}", line);
line.clear();
// reload the poller
poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
}
}
}
if out_closed && err_closed {
println!("Stream closed, exiting process thread");
break;
}
}
}
});
handle.join().unwrap();
Ok(())
}
Кроме того, при использовании с оболочкой над EventFd становится возможным легко остановить процесс из другого потока без блокировки и активного опроса и использовать только один поток.
РЕДАКТИРОВАТЬ: Кажется, что ящик для опроса автоматически устанавливает опрашиваемые дескрипторы в неблокирующем режиме после моих тестов. Функция set_nonblocking по-прежнему полезна, если вы хотите напрямую использовать объект nix:: poll.
Я встречал достаточно случаев, когда было полезно взаимодействовать с подпроцессом через текст, разделенный строками, поэтому я написал для него ящик, interactive_process.
Я полагаю, что исходная проблема уже давно решена, но я подумал, что это может быть полезно другим.