Почему Future::select сначала выбирает будущее с более длительным периодом сна?
Я пытаюсь понять Future::select
: в этом примере, будущее с более длительной задержкой возвращается первым.
Когда я читаю эту статью с ее примером, у меня возникает когнитивный диссонанс. Автор пишет:
select
Функция запускается два (или более в случаеselect_all
) фьючерсы и возвращает первый, заканчивающийся. Это полезно для реализации тайм-аутов.
Кажется, я не понимаю смысла select
,
extern crate futures;
extern crate tokio_core;
use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;
struct Timeout {
time: u32,
}
impl Timeout {
fn new(period: u32) -> Timeout {
Timeout { time: period }
}
}
impl Future for Timeout {
type Item = u32;
type Error = String;
fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
thread::sleep(Duration::from_secs(self.time as u64));
println!("Timeout is done with time {}.", self.time);
Ok(Async::Ready(self.time))
}
}
fn main() {
let mut reactor = Core::new().unwrap();
let time_out1 = Timeout::new(5);
let time_out2 = Timeout::new(1);
let task = time_out1.select(time_out2);
let mut reactor = Core::new().unwrap();
reactor.run(task);
}
Мне нужно обработать раннее будущее с меньшей задержкой, а затем работать с будущим с большей задержкой. Как мне это сделать?
1 ответ
Если есть одна вещь, от которой нужно отказаться: никогда не выполняйте блокирующие или длительные операции внутри асинхронных операций.
Если вы хотите тайм-аут, используйте что-то из tokio::timer
, такие как Delay
или же Timeout
:
extern crate futures; // 0.1.23
extern crate tokio; // 0.1.8
use futures::prelude::*;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
fn main() {
let time_out1 = Delay::new(Instant::now() + Duration::from_secs(5));
let time_out2 = Delay::new(Instant::now() + Duration::from_secs(1));
let task = time_out1.select(time_out2);
tokio::run(task.map(drop).map_err(drop));
}
Чтобы понять, почему вы получаете поведение, которое вы делаете, вы должны понять реализацию фьючерсов на высоком уровне.
Когда вы звоните run
есть цикл, который вызывает poll
на переданное в будущем. Он зацикливается, пока будущее не вернет успех или неудачу, иначе будущее еще не сделано.
Ваша реализация poll
"блокирует" этот цикл на 5 секунд, потому что ничто не может прервать вызов sleep
, К тому времени, когда сон сделан, будущее готово, таким образом, это будущее выбрано.
Реализация асинхронного тайм-аута концептуально работает, проверяя часы каждый раз, когда они опрашиваются, говоря, прошло ли достаточно времени или нет.
Большая разница в том, что когда будущее возвращается, что оно еще не готово, можно проверить другое будущее. Это то, что select
делает!
Драматическая реконструкция:
таймер сна
ядро: эй
select
, ты готов идти?выберите: Эй
future1
, ты готов идти?future1: Задержитесь на секундную [... 5 секундную передачу...] nnnnd. Да!
асинхронный таймер
ядро: эй
select
, ты готов идти?выберите: Эй
future1
, ты готов идти?будущее1: проверяет часы №
выберите: Эй
future2
, ты готов идти?будущее2: проверяет часы
ядро: эй
select
, ты готов идти?[... 1 секунда проходит...]
ядро: эй
select
, ты готов идти?выберите: Эй
future1
, ты готов идти?будущее1: проверяет часы №
выберите: Эй
future2
, ты готов идти?future2: Проверки смотреть Да!
Если у вас есть операция, которая блокирует или выполняет долго, то вам нужно вывести эту работу из асинхронного цикла. Наиболее распространенным решением является использование пула потоков, такого как futures-cpupool или tokio-pool. Однако использование любого из них для реализации тайм-аута крайне неэффективно:
extern crate futures;
extern crate futures_cpupool;
extern crate tokio_core;
use std::thread;
use std::time::Duration;
use futures::Future;
use tokio_core::reactor::Core;
use futures_cpupool::CpuPool;
fn main() {
let pool = CpuPool::new(4);
let mut core = Core::new().unwrap();
let time_out1 = pool.spawn_fn::<_, Result<(), ()>>(|| {
Ok(thread::sleep(Duration::from_secs(5)))
});
let time_out2 = pool.spawn_fn::<_, Result<(), ()>>(|| {
Ok(thread::sleep(Duration::from_secs(1)))
});
let task = time_out1.select(time_out2);
core.run(task).unwrap();
}