Каков наилучший подход для инкапсуляции блокирующего ввода-вывода в будущем?

Я читаю документацию Tokio и задаюсь вопросом, каков наилучший подход для инкапсуляции дорогостоящих синхронных операций ввода-вывода в будущем.

Благодаря структуре реактора мы получаем преимущество модели "зеленых потоков": несколько потоков ОС обрабатывают множество параллельных задач через исполнителя.

Модель будущего Tokio зависит от спроса, что означает, что само будущее будет опрашивать свое внутреннее состояние, чтобы предоставить информацию о его завершении; позволяя противодавление и возможности отмены. Насколько я понимаю, фаза опроса будущего должна быть неблокируемой, чтобы работать хорошо.

Ввод-вывод, который нужно инкапсулировать, можно рассматривать как длительную атомарную и дорогостоящую операцию. В идеале, независимая задача будет выполнять ввод-вывод, а соответствующее будущее будет опрашивать поток ввода-вывода для получения статуса завершения.

Я вижу только два варианта:

  • Включите блокирующий ввод / вывод в poll функция будущего.
  • порождает поток ОС для выполнения операций ввода-вывода и использования будущего механизма для опроса его состояния, как показано в документации

Насколько я понимаю, ни одно из этих решений не является оптимальным и не дает полного преимущества модели "зеленой нити" (первое не рекомендуется в документации, а второе не проходит через исполнителя, предоставленного каркасом реактора). Есть ли другое решение?

1 ответ

Решение

В идеале, независимая задача будет выполнять ввод-вывод, а соответствующее будущее будет опрашивать поток ввода-вывода для получения статуса завершения.

Да, это то, что рекомендует Токио, и для чего были созданы такие ящики, как futures-cpupool и tokio-threadpool. Обратите внимание, что это не ограничивается вводом / выводом, но действительно для любой длительной синхронной задачи!

В этом случае вы планируете закрытие для запуска в пуле. Сам пул выполняет работу, чтобы проверить, завершено ли еще закрытие блокировки, и выполняет Future черта характера.

extern crate futures;
extern crate futures_cpupool;

use futures::{future, Future};
use futures_cpupool::CpuPool;
use std::thread;
use std::time::Duration;

fn main() {
    let pool = CpuPool::new(8);

    let a = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(3));
        future::ok::<_, ()>(3)
    });
    let b = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(1));
        future::ok::<_, ()>(1)
    });

    let c = a.join(b).map(|(a, b)| a + b);

    let result = c.wait();
    println!("{:?}", result);
}

Обратите внимание, что это не эффективный способ спать, это просто заполнитель для некоторой операции блокировки. Если вам действительно нужно спать, используйте что-то вроде фьючерсного таймера или токио-таймера.

Вы можете видеть, что общее время составляет всего 3 секунды:

$ time ./target/debug/example
Ok(4)

real    0m3.021s
user    0m0.007s
sys     0m0.010s

Также вы можете использовать tokio-threadpool для того же результата:

extern crate tokio; // 0.1.7
extern crate tokio_threadpool; // 0.1.2

use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime};

fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
    future::poll_fn(move || {
        tokio_threadpool::blocking(|| {
            thread::sleep(Duration::from_secs(seconds));
            seconds
        })
    })
}

fn main() {
    let a = delay_for(3);
    let b = delay_for(1);
    let sum = a.join(b).map(|(a, b)| a + b);

    let mut runtime = Runtime::new().expect("Unable to start the runtime");
    let result = runtime.block_on(sum);
    println!("{:?}", result);
}

ни одно из решений не является оптимальным и не может в полной мере использовать преимущества модели Green-Threading

Это правильно - потому что у вас нет чего-то асинхронного! Вы пытаетесь объединить две разные методологии, и между ними должен быть какой-то уродливый кусочек.

во-вторых, не проходите через исполнителя, предусмотренного каркасом реактора

Я не уверен, что вы имеете в виду здесь. В приведенном выше примере есть только один исполнитель; тот, неявно созданный wait, Пул потоков имеет некоторую внутреннюю логику, которая проверяет, завершен ли поток, но это должно запускаться только тогда, когда исполнитель пользователя poll сидеть.

Другие вопросы по тегам