Как обрабатывать блокировку ввода-вывода в Rust или длительные вызовы внешних функций в целом

Мне нужно прочитать данные, предоставленные внешним процессом, через файловый дескриптор posix в моей программе Rust. Соединение fd поддерживается очень долго (часы), и другая сторона время от времени передает мне данные. Поэтому мне нужно постоянно читать и обрабатывать поток данных.

Для этого я написал цикл, который вызывает libc::read() (на самом деле readv), чтобы читать данные и обрабатывать их при получении. Так как это блокирует весь планировщик, я создаю задачу для нового планировщика (task::spawn_sched(SingleThreaded)). Это работает нормально, пока работает, но я просто не могу найти способ аккуратно отключить цикл.

Поскольку цикл блокирует большую часть времени, я не могу использовать порт / канал, чтобы уведомить цикл о выходе.

Я попытался убить задачу цикла, сняв ее с помощью сбойной связанной задачи (создать задачу цикла под наблюдением, создать связанную задачу внутри нее и дождаться сигнала на порте, прежде чем произойдет fail!()вместе с ним выполняем задание цикла). Это хорошо работает в тестах, но libc::read() не прерывается (задание не завершается до завершения чтения и выполнения task::yield() в какой-то момент.

Я многому научился, глядя на исходники libcore, но, похоже, не могу найти правильного решения.

  1. Есть ли способ убить (дочернюю) задачу в Rust, даже если она выполняет какой-то длинный внешний вызов функции, такой как блокирующее чтение?
  2. Есть ли способ сделать неблокирующее чтение для файлового дескриптора posix, чтобы Rust сохранял контроль над задачей?
  3. Как я могу реагировать на сигналы, например, SIGTERM, если пользователь завершает мою программу. Кажется, что-то вроде sigaction() в ржавчине еще нет?

1 ответ

  1. По словам Мозилы, убить задание уже невозможно, не говоря уже о блокировке чтения.
  2. Это можно будет сделать после mozilla/rust/pull/11410см. также мой другой отчет о проблеме для rust-zmq erickt/rust-zmq/issues/24 что также зависит от этого. (извините за ссылки)
  3. Может быть, слушатель сигнала будет работать для вас.

Есть ли способ убить (дочернюю) задачу в Rust, даже если она выполняет длинный вызов внешней функции, например, блокирующее чтение?

Нет.

Смотрите также:

Есть ли способ сделать неблокирующее [...] чтение, чтобы Rust сохранил контроль над задачей?

Да.

Смотрите также:

на дескрипторе файла POSIX

Да.

Смотрите также:

Как я могу реагировать на сигналы

Определитесь с желаемой опорой платформы, а затем выберите подходящий ящик.

Смотрите также:

Собираем все вместе

use future::Either;
use signal_hook::iterator::Signals;
use std::os::unix::io::FromRawFd;
use tokio::{fs::File, io, prelude::*};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

fn main() -> Result<()> {
    let signals = Signals::new(&[signal_hook::SIGUSR1])?;
    let signals = signals.into_async()?;

    let input = unsafe { std::fs::File::from_raw_fd(5) };
    let input = File::from_std(input);
    let lines = io::lines(std::io::BufReader::new(input));

    let signals = signals.map(Either::A);
    let lines = lines.map(Either::B);

    let combined = signals.select(lines);

    tokio::run({
        combined
            .map_err(|e| panic!("Early error: {}", e))
            .for_each(|v| match v {
                Either::A(signal) => {
                    println!("Got signal: {:?}", signal);
                    Err(())
                }
                Either::B(data) => {
                    println!("Got data: {:?}", data);
                    Ok(())
                }
            })
    });

    Ok(())
}

Cargo.toml

[package]
name = "future_example"
version = "0.1.0"
authors = ["An Devloper <an.devloper@example.com>"]
edition = "2018"

[dependencies]
tokio = "0.1.22"
signal-hook = { version = "0.1.9", features = ["tokio-support"] }

shim.sh

#!/bin/bash

set -eu

exec 5< /tmp/testpipe
exec ./target/debug/future_example

Исполнение

cargo build
mkfifo /tmp/testpipe
./shim.sh

Другой терминал

printf 'hello\nthere\nworld' > /tmp/testpipe
kill -s usr1 $PID_OF_THE_PROCESS
Другие вопросы по тегам