Почему `Future::poll` не вызывается повторно после возврата`NotReady`?
Рассмотрим следующий код
extern crate futures;
use std::sync::{atomic, Arc};
use futures::*;
struct F(Arc<atomic::AtomicBool>);
impl Future for F {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
println!("Check if flag is set");
if self.0.load(atomic::Ordering::Relaxed) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
fn main() {
let flag = Arc::new(atomic::AtomicBool::new(false));
let future = F(flag.clone());
::std::thread::spawn(move || {
::std::thread::sleep_ms(10);
println!("set flag");
flag.store(true, atomic::Ordering::Relaxed);
});
// ::std::thread::sleep_ms(20);
let result = future.wait();
println!("result: {:?}", result);
}
Порожденная нить устанавливает флаг, которого ждет будущее. Мы также спим порожденную нить, поэтому начальный .poll()
позвонить с .wait()
до того, как флаг установлен. Это вызывает .wait()
блокировать (казалось бы) на неопределенный срок. Если мы раскомментируем другой thread::sleep_ms
, .wait()
возвращает и распечатывает результат (()
).
Я ожидаю, что текущий поток попытается решить будущее, вызвав poll
несколько раз, так как мы блокируем текущий поток. Однако этого не происходит.
Я попытался прочитать некоторые документы, и кажется, что проблема в том, что поток park
ред после получения NotReady
от poll
первый раз. Однако мне не понятно, почему это так, или как можно обойти это.
Что мне не хватает?
1 ответ
Зачем вам нужно парковать ожидающее будущее, а не опрашивать его повторно? Ответ довольно очевиден, ИМХО. Потому что в конце концов это быстрее и эффективнее!
Чтобы многократно опрашивать будущее (которое можно назвать " занятым ожиданием "), библиотека должна будет решить, делать это часто или редко, и ни один из ответов не является удовлетворительным. Делайте это часто, и вы тратите впустую циклы процессора, делайте это редко, и код медленно реагирует.
Так что да, вам нужно припарковать задачу, когда вы чего-то ждете, а затем отменить ее, когда закончите ждать. Как это:
#![allow(deprecated)]
extern crate futures;
use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};
struct Status {
ready: bool,
task: Option<Task>,
}
#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);
impl Future for F {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
println!("Check if flag is set");
let mut status = self.0.lock().expect("!lock");
if status.ready {
Ok(Async::Ready(()))
} else {
status.task = Some(park());
Ok(Async::NotReady)
}
}
}
#[test]
fn test() {
let flag = Arc::new(Mutex::new(Status {
ready: false,
task: None,
}));
let future = F(flag.clone());
::std::thread::spawn(move || {
::std::thread::sleep_ms(10);
println!("set flag");
let mut status = flag.lock().expect("!lock");
status.ready = true;
if let Some(ref task) = status.task {
task.unpark()
}
});
let result = future.wait();
println!("result: {:?}", result);
}
Обратите внимание, что Future::poll
Здесь он делает несколько вещей: он проверяет внешнее состояние и парковает задачу, поэтому можно участвовать в гонке, например, когда:
-
poll
проверяет переменную и находит ееfalse
; - внешний код устанавливает переменную в
true
; - внешний код проверяет, запаркована ли задача, и обнаруживает, что это не так;
-
poll
паркует задание, но бум! уже слишком поздно, никто не собирается больше его не парковать.
Чтобы избежать каких-либо гонок, я использовал Mutex
синхронизировать эти взаимодействия.
PS Если все, что вам нужно, это обернуть результат потока в Future
затем рассмотрите возможность использования oneshot
канал: у него есть Receiver
который реализует Future
интерфейс уже.