Как перебрать Vec функций, возвращающих Futures в Rust?
Можно ли зациклить Vec
, вызывая метод, который возвращает Future
на каждом и построить цепочку Future
s, чтобы быть оцененным (в конечном счете) потребителем? Следует ли выполнить позже Future
S будет зависеть от результатов ранее Future
в Vec
,
Чтобы уточнить:
Я работаю над приложением, которое может извлекать данные из произвольного набора исходных источников.
Запрашиваемые данные будут проверяться с каждым из источников по очереди. Если первый источник имел ошибку (Err
), или не было доступных данных (None
), тогда будет опробован второй источник и так далее.
Каждый источник должен быть опробован ровно один раз, и ни один источник не должен быть опробован до тех пор, пока все источники не вернут свои результаты. Ошибки регистрируются, но в противном случае игнорируются, передавая запрос следующему источнику данных верхнего уровня.
У меня есть некоторый рабочий код, который делает это для получения метаданных:
/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
/// Upstream data sources
readers: Vec<Rc<RefCell<StoreRead>>>,
/// Downstream data sinks
writers: Vec<Rc<RefCell<StoreWrite>>>,
}
impl StoreRead for StoreManager {
fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
Box::new(ok(self.readers
.iter()
.map(|store| {
executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
error!("Error on metadata(): {:?}", err);
None
})
})
.find(Option::is_some)
.unwrap_or(None)))
}
}
Помимо моего несчастья со всеми Box
а также Rc/RefCell
ерунда, мое настоящее беспокойство с executor::block_on()
вызов. Это блокирует, ожидая каждого Future
вернуть результат, прежде чем перейти к следующему.
Учитывая, что можно позвонить fn_returning_future().or_else(|_| other_fn())
и так далее, возможно ли построить такую динамическую цепочку? Или это требование полной оценки каждого Future
в итераторе, прежде чем перейти к следующему?
1 ответ
Ты можешь использовать stream::unfold
преобразовать одно значение в поток. В этом случае мы можем использовать IntoIter
итератор как это единственное значение.
extern crate futures; // 0.2.1
use futures::{executor, future, stream, Future, FutureExt, Stream, StreamExt};
type Error = ();
fn network_request(val: i32) -> impl Future<Item = i32, Error = Error> {
// Just for demonstration, don't do this in a real program
use std::{thread, time::{Duration, Instant}};
thread::sleep(Duration::from_secs(1));
println!("Resolving {} at {:?}", val, Instant::now());
future::ok(val * 100)
}
fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = i32, Error = Error> {
stream::unfold(vals.into_iter(), |mut vals| {
match vals.next() {
Some(v) => network_request(v).map(|v| Some((v, vals))).left_future(),
None => future::ok(None).right_future(),
}
})
}
fn main() {
let s = requests_in_sequence(vec![1, 2, 3]);
let f = s.for_each(|v| {
println!("-> {}", v);
Ok(())
});
executor::block_on(f).unwrap();
}
Resolving 1 at Instant { tv_sec: 3416957, tv_nsec: 29270595 }
-> 100
Resolving 2 at Instant { tv_sec: 3416958, tv_nsec: 29450854 }
-> 200
Resolving 3 at Instant { tv_sec: 3416959, tv_nsec: 29624479 }
-> 300
Игнорировать Err
а также None
Придется Error
к Item
, делая Item
наберите "А Result<Option<T>, Error>
:
extern crate futures; // 0.2.1
use futures::{executor, future, stream, Future, FutureExt, Never, Stream, StreamExt};
struct Error;
fn network_request(val: i32) -> impl Future<Item = Option<i32>, Error = Error> {
// Just for demonstration, don't do this in a real program
use std::{thread, time::{Duration, Instant}};
thread::sleep(Duration::from_millis(100));
println!("Resolving {} at {:?}", val, Instant::now());
match val {
1 => future::err(Error), // An error
2 => future::ok(None), // No data
_ => future::ok(Some(val * 100)), // Success
}
}
fn requests_in_sequence<I>(vals: I) -> impl Stream<Item = Result<Option<i32>, Error>, Error = Never>
where
I: IntoIterator<Item = i32>,
{
stream::unfold(vals.into_iter(), |mut vals| {
match vals.next() {
Some(v) => {
network_request(v)
.then(|v| future::ok(Some((v, vals)))) // Convert `Item` into `Result<Option<i32>, Error>`
.left_future()
}
None => future::ok(None).right_future(),
}
})
}
fn main() {
let reqs = requests_in_sequence(vec![1, 2, 3, 4, 5]);
let success = reqs
.filter_map(|x| future::ok(x.ok())) // Ignore all `Result::Err`
.filter_map(|x| future::ok(x)) // Ignore all `Option::None`
.next(); // Get first value
match executor::block_on(success) {
Ok((Some(v), _s)) => println!("First success: {}", v),
Ok((None, _s)) => println!("No successful requests"),
Err(_) => unreachable!("Impossible to fail"),
}
}
Resolving 1 at Instant { tv_sec: 3428278, tv_nsec: 513758474 }
Resolving 2 at Instant { tv_sec: 3428278, tv_nsec: 614059691 }
Resolving 3 at Instant { tv_sec: 3428278, tv_nsec: 714256066 }
First success: 300
Можно ли построить динамическую цепочку, как это
Да, но это требует значительного количества дополнительного распределения и косвенного обращения, а также требует наличия по крайней мере одного значения для доступа:
fn requests_in_sequence(vals: Vec<i32>) -> Box<Future<Item = i32, Error = Error>> {
let mut vals = vals.into_iter();
let val1 = vals.next().expect("Need at least one value to start from");
vals.fold(Box::new(network_request(val1)), |acc, val| {
Box::new(acc.or_else(move |_| network_request(val)))
})
}
Смотрите также:
это требование для полной оценки каждого
Future
в итераторе, прежде чем перейти к следующему
Разве это не часть ваших собственных требований? Акцент мой:
Запрашиваемые данные будут проверяться с каждым из источников по очереди. Если первый источник имел ошибку (
Err
), или не было доступных данных (None
), тогда будет опробован второй источник