Каков наилучший подход для инкапсуляции блокирующего ввода-вывода в будущем?
Я читаю документацию Tokio и задаюсь вопросом, каков наилучший подход для инкапсуляции дорогостоящих синхронных операций ввода-вывода в будущем.
Благодаря структуре реактора мы получаем преимущество модели "зеленых потоков": несколько потоков ОС обрабатывают множество параллельных задач через исполнителя.
Модель будущего Tokio зависит от спроса, что означает, что само будущее будет опрашивать свое внутреннее состояние, чтобы предоставить информацию о его завершении; позволяя противодавление и возможности отмены. Насколько я понимаю, фаза опроса будущего должна быть неблокируемой, чтобы работать хорошо.
Ввод-вывод, который нужно инкапсулировать, можно рассматривать как длительную атомарную и дорогостоящую операцию. В идеале, независимая задача будет выполнять ввод-вывод, а соответствующее будущее будет опрашивать поток ввода-вывода для получения статуса завершения.
Я вижу только два варианта:
- Включите блокирующий ввод / вывод в
poll
функция будущего. - порождает поток ОС для выполнения операций ввода-вывода и использования будущего механизма для опроса его состояния, как показано в документации
Насколько я понимаю, ни одно из этих решений не является оптимальным и не дает полного преимущества модели "зеленой нити" (первое не рекомендуется в документации, а второе не проходит через исполнителя, предоставленного каркасом реактора). Есть ли другое решение?
1 ответ
В идеале, независимая задача будет выполнять ввод-вывод, а соответствующее будущее будет опрашивать поток ввода-вывода для получения статуса завершения.
Да, это то, что рекомендует Токио, и для чего были созданы такие ящики, как futures-cpupool и tokio-threadpool. Обратите внимание, что это не ограничивается вводом / выводом, но действительно для любой длительной синхронной задачи!
В этом случае вы планируете закрытие для запуска в пуле. Сам пул выполняет работу, чтобы проверить, завершено ли еще закрытие блокировки, и выполняет Future
черта характера.
extern crate futures;
extern crate futures_cpupool;
use futures::{future, Future};
use futures_cpupool::CpuPool;
use std::thread;
use std::time::Duration;
fn main() {
let pool = CpuPool::new(8);
let a = pool.spawn_fn(|| {
thread::sleep(Duration::from_secs(3));
future::ok::<_, ()>(3)
});
let b = pool.spawn_fn(|| {
thread::sleep(Duration::from_secs(1));
future::ok::<_, ()>(1)
});
let c = a.join(b).map(|(a, b)| a + b);
let result = c.wait();
println!("{:?}", result);
}
Обратите внимание, что это не эффективный способ спать, это просто заполнитель для некоторой операции блокировки. Если вам действительно нужно спать, используйте что-то вроде фьючерсного таймера или токио-таймера.
Вы можете видеть, что общее время составляет всего 3 секунды:
$ time ./target/debug/example
Ok(4)
real 0m3.021s
user 0m0.007s
sys 0m0.010s
Также вы можете использовать tokio-threadpool для того же результата:
extern crate tokio; // 0.1.7
extern crate tokio_threadpool; // 0.1.2
use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime};
fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
future::poll_fn(move || {
tokio_threadpool::blocking(|| {
thread::sleep(Duration::from_secs(seconds));
seconds
})
})
}
fn main() {
let a = delay_for(3);
let b = delay_for(1);
let sum = a.join(b).map(|(a, b)| a + b);
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on(sum);
println!("{:?}", result);
}
ни одно из решений не является оптимальным и не может в полной мере использовать преимущества модели Green-Threading
Это правильно - потому что у вас нет чего-то асинхронного! Вы пытаетесь объединить две разные методологии, и между ними должен быть какой-то уродливый кусочек.
во-вторых, не проходите через исполнителя, предусмотренного каркасом реактора
Я не уверен, что вы имеете в виду здесь. В приведенном выше примере есть только один исполнитель; тот, неявно созданный wait
, Пул потоков имеет некоторую внутреннюю логику, которая проверяет, завершен ли поток, но это должно запускаться только тогда, когда исполнитель пользователя poll
сидеть.