Почему Future::select сначала выбирает будущее с более длительным периодом сна?

Я пытаюсь понять Future::select: в этом примере, будущее с более длительной задержкой возвращается первым.

Когда я читаю эту статью с ее примером, у меня возникает когнитивный диссонанс. Автор пишет:

select Функция запускается два (или более в случае select_all) фьючерсы и возвращает первый, заканчивающийся. Это полезно для реализации тайм-аутов.

Кажется, я не понимаю смысла select,

extern crate futures;
extern crate tokio_core;

use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;

struct Timeout {
    time: u32,
}

impl Timeout {
    fn new(period: u32) -> Timeout {
        Timeout { time: period }
    }
}

impl Future for Timeout {
    type Item = u32;
    type Error = String;

    fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
        thread::sleep(Duration::from_secs(self.time as u64));
        println!("Timeout is done with time {}.", self.time);
        Ok(Async::Ready(self.time))
    }
}

fn main() {
    let mut reactor = Core::new().unwrap();

    let time_out1 = Timeout::new(5);
    let time_out2 = Timeout::new(1);

    let task = time_out1.select(time_out2);

    let mut reactor = Core::new().unwrap();
    reactor.run(task);
}

Мне нужно обработать раннее будущее с меньшей задержкой, а затем работать с будущим с большей задержкой. Как мне это сделать?

1 ответ

Решение

Если есть одна вещь, от которой нужно отказаться: никогда не выполняйте блокирующие или длительные операции внутри асинхронных операций.

Если вы хотите тайм-аут, используйте что-то из tokio::timer, такие как Delay или же Timeout:

extern crate futures; // 0.1.23
extern crate tokio; // 0.1.8

use futures::prelude::*;
use std::time::{Duration, Instant};
use tokio::timer::Delay;

fn main() {
    let time_out1 = Delay::new(Instant::now() + Duration::from_secs(5));
    let time_out2 = Delay::new(Instant::now() + Duration::from_secs(1));

    let task = time_out1.select(time_out2);

    tokio::run(task.map(drop).map_err(drop));
}

Чтобы понять, почему вы получаете поведение, которое вы делаете, вы должны понять реализацию фьючерсов на высоком уровне.

Когда вы звоните run есть цикл, который вызывает poll на переданное в будущем. Он зацикливается, пока будущее не вернет успех или неудачу, иначе будущее еще не сделано.

Ваша реализация poll "блокирует" этот цикл на 5 секунд, потому что ничто не может прервать вызов sleep, К тому времени, когда сон сделан, будущее готово, таким образом, это будущее выбрано.

Реализация асинхронного тайм-аута концептуально работает, проверяя часы каждый раз, когда они опрашиваются, говоря, прошло ли достаточно времени или нет.

Большая разница в том, что когда будущее возвращается, что оно еще не готово, можно проверить другое будущее. Это то, что select делает!

Драматическая реконструкция:

таймер сна

ядро: эй select, ты готов идти?

выберите: Эй future1, ты готов идти?

future1: Задержитесь на секундную [... 5 секундную передачу...] nnnnd. Да!

асинхронный таймер

ядро: эй select, ты готов идти?

выберите: Эй future1, ты готов идти?

будущее1: проверяет часы

выберите: Эй future2, ты готов идти?

будущее2: проверяет часы

ядро: эй select, ты готов идти?

[... 1 секунда проходит...]

ядро: эй select, ты готов идти?

выберите: Эй future1, ты готов идти?

будущее1: проверяет часы

выберите: Эй future2, ты готов идти?

future2: Проверки смотреть Да!


Если у вас есть операция, которая блокирует или выполняет долго, то вам нужно вывести эту работу из асинхронного цикла. Наиболее распространенным решением является использование пула потоков, такого как futures-cpupool или tokio-pool. Однако использование любого из них для реализации тайм-аута крайне неэффективно:

extern crate futures;
extern crate futures_cpupool;
extern crate tokio_core;

use std::thread;
use std::time::Duration;
use futures::Future;
use tokio_core::reactor::Core;
use futures_cpupool::CpuPool;

fn main() {
    let pool = CpuPool::new(4);

    let mut core = Core::new().unwrap();

    let time_out1 = pool.spawn_fn::<_, Result<(), ()>>(|| {
        Ok(thread::sleep(Duration::from_secs(5)))
    });
    let time_out2 = pool.spawn_fn::<_, Result<(), ()>>(|| {
        Ok(thread::sleep(Duration::from_secs(1)))
    });

    let task = time_out1.select(time_out2);

    core.run(task).unwrap();
}
Другие вопросы по тегам