Почему `Future::poll` не вызывается повторно после возврата`NotReady`?

Рассмотрим следующий код

extern crate futures;

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

Порожденная нить устанавливает флаг, которого ждет будущее. Мы также спим порожденную нить, поэтому начальный .poll() позвонить с .wait() до того, как флаг установлен. Это вызывает .wait() блокировать (казалось бы) на неопределенный срок. Если мы раскомментируем другой thread::sleep_ms, .wait() возвращает и распечатывает результат (()).

Я ожидаю, что текущий поток попытается решить будущее, вызвав poll несколько раз, так как мы блокируем текущий поток. Однако этого не происходит.

Я попытался прочитать некоторые документы, и кажется, что проблема в том, что поток parkред после получения NotReady от poll первый раз. Однако мне не понятно, почему это так, или как можно обойти это.

Что мне не хватает?

1 ответ

Решение

Зачем вам нужно парковать ожидающее будущее, а не опрашивать его повторно? Ответ довольно очевиден, ИМХО. Потому что в конце концов это быстрее и эффективнее!

Чтобы многократно опрашивать будущее (которое можно назвать " занятым ожиданием "), библиотека должна будет решить, делать это часто или редко, и ни один из ответов не является удовлетворительным. Делайте это часто, и вы тратите впустую циклы процессора, делайте это редко, и код медленно реагирует.

Так что да, вам нужно припарковать задачу, когда вы чего-то ждете, а затем отменить ее, когда закончите ждать. Как это:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}

Обратите внимание, что Future::poll Здесь он делает несколько вещей: он проверяет внешнее состояние и парковает задачу, поэтому можно участвовать в гонке, например, когда:

  1. poll проверяет переменную и находит ее false;
  2. внешний код устанавливает переменную в true;
  3. внешний код проверяет, запаркована ли задача, и обнаруживает, что это не так;
  4. poll паркует задание, но бум! уже слишком поздно, никто не собирается больше его не парковать.

Чтобы избежать каких-либо гонок, я использовал Mutex синхронизировать эти взаимодействия.

PS Если все, что вам нужно, это обернуть результат потока в Future затем рассмотрите возможность использования oneshot канал: у него есть Receiver который реализует Future интерфейс уже.

Другие вопросы по тегам