Как объединить итератор потоков?

Я пишу программу, которая очищает ссылки с https://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/, используя Rust. я использую hyper а также futures,

У меня есть коллекция ссылок на каждый раздел и использовать stream::unfold построить доступ к каждой странице:

// Returns the links scraped, and probably Uri to the next page.
fn process_body_and_return_next(body: Body) -> (Vec<String>, Option<Uri>) { ... }

// In main()
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let uris = ...

let jobs = uris.map(|uri| {
    stream::unfold(Some(uri), |uri| {
        uri.map(|uri| {
            client
                .get(uri)
                .and_then(|res| res.body().concat2())
                .map(process_body_and_return_next)
        })
    })
});

Теперь у меня есть impl Iterator<Item = impl Stream<Item = Vec<String>>>, Как я могу объединить его в один Stream из Vec с, как использовать stream::futures_unordered объединить Future s?


Изменить: я пытался объединить stream::iter_ok а также stream::Stream::flatten:

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();

Но это неэффективно, так как я хочу отправить несколько веб-запросов асинхронно. Комбинированный Stream должен производить ценность всякий раз, когда внутренний Stream готов.

2 ответа

Итератор может быть превращен в поток с помощью futures::stream::iter_ok это позволяет превратить ваш итератор потоков в поток потоков:

::futures::stream::iter_ok(jobs)

Затем вы можете сгладить этот поток потоков в один поток всех элементов, используя Stream::flatten():

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();

select комбинатор занимает два Stream s и дает, когда один из двух потоков готов.

Чтобы выбрать из более чем двух потоков, вы можете связать select, Однако, так как вы не знаете заранее количество потоков, которые вы должны выбрать, вам придется поместить промежуточные потоки в рамки, чтобы стереть определенные Stream типа, так что программа проверяет тип.

extern crate futures;

use futures::Stream;

fn select_all<'a, I, T, E>(seq: I) -> Box<Stream<Item = T, Error = E> + 'a>
where
    I: IntoIterator,
    I::Item: Stream<Item = T, Error = E> + 'a,
    T: 'a,
    E: 'a,
{
    let mut iter = seq.into_iter();
    let mut result = Box::new(iter.next().expect("got an empty list of streams"))
        as Box<Stream<Item = T, Error = E>>;
    while let Some(next) = iter.next() {
        result = Box::new(result.select(next));
    }

    result
}

Хотя, безусловно, есть более эффективный способ реализовать это. E сть select_all комбинатор для фьючерсов, но пока нет ни одного для потоков. Возможно, вы могли бы реализовать это самостоятельно и отправить его как запрос на извлечение!

Другие вопросы по тегам