Как перебрать Vec функций, возвращающих Futures в Rust?

Можно ли зациклить Vec, вызывая метод, который возвращает Future на каждом и построить цепочку Futures, чтобы быть оцененным (в конечном счете) потребителем? Следует ли выполнить позже FutureS будет зависеть от результатов ранее Futureв Vec,

Чтобы уточнить:

Я работаю над приложением, которое может извлекать данные из произвольного набора исходных источников.

Запрашиваемые данные будут проверяться с каждым из источников по очереди. Если первый источник имел ошибку (Err), или не было доступных данных (None), тогда будет опробован второй источник и так далее.

Каждый источник должен быть опробован ровно один раз, и ни один источник не должен быть опробован до тех пор, пока все источники не вернут свои результаты. Ошибки регистрируются, но в противном случае игнорируются, передавая запрос следующему источнику данных верхнего уровня.

У меня есть некоторый рабочий код, который делает это для получения метаданных:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Помимо моего несчастья со всеми Box а также Rc/RefCell ерунда, мое настоящее беспокойство с executor::block_on() вызов. Это блокирует, ожидая каждого Future вернуть результат, прежде чем перейти к следующему.

Учитывая, что можно позвонить fn_returning_future().or_else(|_| other_fn()) и так далее, возможно ли построить такую ​​динамическую цепочку? Или это требование полной оценки каждого Future в итераторе, прежде чем перейти к следующему?

1 ответ

Решение

Ты можешь использовать stream::unfold преобразовать одно значение в поток. В этом случае мы можем использовать IntoIter итератор как это единственное значение.

extern crate futures; // 0.2.1

use futures::{executor, future, stream, Future, FutureExt, Stream, StreamExt};

type Error = ();

fn network_request(val: i32) -> impl Future<Item = i32, Error = Error> {        
    // Just for demonstration, don't do this in a real program
    use std::{thread, time::{Duration, Instant}};
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    future::ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = i32, Error = Error> {
    stream::unfold(vals.into_iter(), |mut vals| {
        match vals.next() {
            Some(v) => network_request(v).map(|v| Some((v, vals))).left_future(),
            None => future::ok(None).right_future(),
        }
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    let f = s.for_each(|v| {
        println!("-> {}", v);
        Ok(())
    });
    executor::block_on(f).unwrap();
}
Resolving 1 at Instant { tv_sec: 3416957, tv_nsec: 29270595 }
-> 100
Resolving 2 at Instant { tv_sec: 3416958, tv_nsec: 29450854 }
-> 200
Resolving 3 at Instant { tv_sec: 3416959, tv_nsec: 29624479 }
-> 300

Игнорировать Err а также None Придется Error к Item, делая Item наберите "А Result<Option<T>, Error>:

extern crate futures; // 0.2.1

use futures::{executor, future, stream, Future, FutureExt, Never, Stream, StreamExt};

struct Error;

fn network_request(val: i32) -> impl Future<Item = Option<i32>, Error = Error> {
    // Just for demonstration, don't do this in a real program
    use std::{thread, time::{Duration, Instant}};
    thread::sleep(Duration::from_millis(100));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => future::err(Error),          // An error
        2 => future::ok(None),            // No data
        _ => future::ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence<I>(vals: I) -> impl Stream<Item = Result<Option<i32>, Error>, Error = Never>
where
    I: IntoIterator<Item = i32>,
{
    stream::unfold(vals.into_iter(), |mut vals| {
        match vals.next() {
            Some(v) => {
                network_request(v)
                    .then(|v| future::ok(Some((v, vals)))) // Convert `Item` into `Result<Option<i32>, Error>`
                    .left_future()
            }
            None => future::ok(None).right_future(),
        }
    })
}

fn main() {
    let reqs = requests_in_sequence(vec![1, 2, 3, 4, 5]);

    let success = reqs
        .filter_map(|x| future::ok(x.ok())) // Ignore all `Result::Err`
        .filter_map(|x| future::ok(x))      // Ignore all `Option::None`
        .next();                            // Get first value

    match executor::block_on(success) {
        Ok((Some(v), _s)) => println!("First success: {}", v),
        Ok((None, _s)) => println!("No successful requests"),
        Err(_) => unreachable!("Impossible to fail"),
    }
}
Resolving 1 at Instant { tv_sec: 3428278, tv_nsec: 513758474 }
Resolving 2 at Instant { tv_sec: 3428278, tv_nsec: 614059691 }
Resolving 3 at Instant { tv_sec: 3428278, tv_nsec: 714256066 }
First success: 300

Можно ли построить динамическую цепочку, как это

Да, но это требует значительного количества дополнительного распределения и косвенного обращения, а также требует наличия по крайней мере одного значения для доступа:

fn requests_in_sequence(vals: Vec<i32>) -> Box<Future<Item = i32, Error = Error>> {
    let mut vals = vals.into_iter();

    let val1 = vals.next().expect("Need at least one value to start from");

    vals.fold(Box::new(network_request(val1)), |acc, val| {
        Box::new(acc.or_else(move |_| network_request(val)))
    })
}

Смотрите также:


это требование для полной оценки каждого Future в итераторе, прежде чем перейти к следующему

Разве это не часть ваших собственных требований? Акцент мой:

Запрашиваемые данные будут проверяться с каждым из источников по очереди. Если первый источник имел ошибку (Err), или не было доступных данных (None), тогда будет опробован второй источник

Другие вопросы по тегам