Как объединить итератор потоков?
Я пишу программу, которая очищает ссылки с https://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/, используя Rust. я использую hyper
а также futures
,
У меня есть коллекция ссылок на каждый раздел и использовать stream::unfold
построить доступ к каждой странице:
// Returns the links scraped, and probably Uri to the next page.
fn process_body_and_return_next(body: Body) -> (Vec<String>, Option<Uri>) { ... }
// In main()
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let uris = ...
let jobs = uris.map(|uri| {
stream::unfold(Some(uri), |uri| {
uri.map(|uri| {
client
.get(uri)
.and_then(|res| res.body().concat2())
.map(process_body_and_return_next)
})
})
});
Теперь у меня есть impl Iterator<Item = impl Stream<Item = Vec<String>>>
, Как я могу объединить его в один Stream
из Vec
с, как использовать stream::futures_unordered
объединить Future
s?
Изменить: я пытался объединить stream::iter_ok
а также stream::Stream::flatten
:
let flattened = ::futures::stream::iter_ok(jobs)
.flatten();
Но это неэффективно, так как я хочу отправить несколько веб-запросов асинхронно. Комбинированный Stream
должен производить ценность всякий раз, когда внутренний Stream
готов.
2 ответа
Итератор может быть превращен в поток с помощью futures::stream::iter_ok
это позволяет превратить ваш итератор потоков в поток потоков:
::futures::stream::iter_ok(jobs)
Затем вы можете сгладить этот поток потоков в один поток всех элементов, используя Stream::flatten()
:
let flattened = ::futures::stream::iter_ok(jobs)
.flatten();
select
комбинатор занимает два Stream
s и дает, когда один из двух потоков готов.
Чтобы выбрать из более чем двух потоков, вы можете связать select
, Однако, так как вы не знаете заранее количество потоков, которые вы должны выбрать, вам придется поместить промежуточные потоки в рамки, чтобы стереть определенные Stream
типа, так что программа проверяет тип.
extern crate futures;
use futures::Stream;
fn select_all<'a, I, T, E>(seq: I) -> Box<Stream<Item = T, Error = E> + 'a>
where
I: IntoIterator,
I::Item: Stream<Item = T, Error = E> + 'a,
T: 'a,
E: 'a,
{
let mut iter = seq.into_iter();
let mut result = Box::new(iter.next().expect("got an empty list of streams"))
as Box<Stream<Item = T, Error = E>>;
while let Some(next) = iter.next() {
result = Box::new(result.select(next));
}
result
}
Хотя, безусловно, есть более эффективный способ реализовать это. E сть select_all
комбинатор для фьючерсов, но пока нет ни одного для потоков. Возможно, вы могли бы реализовать это самостоятельно и отправить его как запрос на извлечение!