Как обрабатывать блокировку ввода-вывода в Rust или длительные вызовы внешних функций в целом
Мне нужно прочитать данные, предоставленные внешним процессом, через файловый дескриптор posix в моей программе Rust. Соединение fd поддерживается очень долго (часы), и другая сторона время от времени передает мне данные. Поэтому мне нужно постоянно читать и обрабатывать поток данных.
Для этого я написал цикл, который вызывает libc::read()
(на самом деле readv), чтобы читать данные и обрабатывать их при получении. Так как это блокирует весь планировщик, я создаю задачу для нового планировщика (task::spawn_sched(SingleThreaded)
). Это работает нормально, пока работает, но я просто не могу найти способ аккуратно отключить цикл.
Поскольку цикл блокирует большую часть времени, я не могу использовать порт / канал, чтобы уведомить цикл о выходе.
Я попытался убить задачу цикла, сняв ее с помощью сбойной связанной задачи (создать задачу цикла под наблюдением, создать связанную задачу внутри нее и дождаться сигнала на порте, прежде чем произойдет fail!()
вместе с ним выполняем задание цикла). Это хорошо работает в тестах, но libc::read()
не прерывается (задание не завершается до завершения чтения и выполнения task::yield()
в какой-то момент.
Я многому научился, глядя на исходники libcore, но, похоже, не могу найти правильного решения.
- Есть ли способ убить (дочернюю) задачу в Rust, даже если она выполняет какой-то длинный внешний вызов функции, такой как блокирующее чтение?
- Есть ли способ сделать неблокирующее чтение для файлового дескриптора posix, чтобы Rust сохранял контроль над задачей?
- Как я могу реагировать на сигналы, например, SIGTERM, если пользователь завершает мою программу. Кажется, что-то вроде
sigaction()
в ржавчине еще нет?
1 ответ
- По словам Мозилы, убить задание уже невозможно, не говоря уже о блокировке чтения.
- Это можно будет сделать после
mozilla/rust/pull/11410
см. также мой другой отчет о проблеме для rust-zmqerickt/rust-zmq/issues/24
что также зависит от этого. (извините за ссылки) - Может быть, слушатель сигнала будет работать для вас.
Есть ли способ убить (дочернюю) задачу в Rust, даже если она выполняет длинный вызов внешней функции, например, блокирующее чтение?
Нет.
Смотрите также:
- Как Rust справляется с убивающими потоками?
- Как завершить или приостановить поток Rust из другого потока?
- Каким стандартным способом вывести поток Rust из блокирующих операций?
Есть ли способ сделать неблокирующее [...] чтение, чтобы Rust сохранил контроль над задачей?
Да.
Смотрите также:
- Как я могу прочитать неблокирующий из стандартного ввода?
- Как мне прочитать вывод дочернего процесса без блокировки в Rust?
- Как я могу заставить поток, для которого заблокировано чтение из файла, возобновить работу в Rust?
- Принудительное неблокирующее чтение с помощью TcpStream
на дескрипторе файла POSIX
Да.
Смотрите также:
- Как я могу читать из определенного необработанного файлового дескриптора в Rust?
- Как мне написать в определенный дескриптор необработанного файла из Rust?
- Как получить tokio-io async_read для дескриптора файла
- Как асинхронно читать файл?
Как я могу реагировать на сигналы
Определитесь с желаемой опорой платформы, а затем выберите подходящий ящик.
Смотрите также:
- Как ловить сигналы в Rust
- Есть ли способ прослушивать сигналы в Windows
- Как обрабатывать сигнал SIGSEGV в пользовательском пространстве с помощью Rust?
Собираем все вместе
use future::Either;
use signal_hook::iterator::Signals;
use std::os::unix::io::FromRawFd;
use tokio::{fs::File, io, prelude::*};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
fn main() -> Result<()> {
let signals = Signals::new(&[signal_hook::SIGUSR1])?;
let signals = signals.into_async()?;
let input = unsafe { std::fs::File::from_raw_fd(5) };
let input = File::from_std(input);
let lines = io::lines(std::io::BufReader::new(input));
let signals = signals.map(Either::A);
let lines = lines.map(Either::B);
let combined = signals.select(lines);
tokio::run({
combined
.map_err(|e| panic!("Early error: {}", e))
.for_each(|v| match v {
Either::A(signal) => {
println!("Got signal: {:?}", signal);
Err(())
}
Either::B(data) => {
println!("Got data: {:?}", data);
Ok(())
}
})
});
Ok(())
}
Cargo.toml
[package]
name = "future_example"
version = "0.1.0"
authors = ["An Devloper <an.devloper@example.com>"]
edition = "2018"
[dependencies]
tokio = "0.1.22"
signal-hook = { version = "0.1.9", features = ["tokio-support"] }
shim.sh
#!/bin/bash
set -eu
exec 5< /tmp/testpipe
exec ./target/debug/future_example
Исполнение
cargo build
mkfifo /tmp/testpipe
./shim.sh
Другой терминал
printf 'hello\nthere\nworld' > /tmp/testpipe
kill -s usr1 $PID_OF_THE_PROCESS