Присоединяйтесь к фьючерсам с ограниченным параллелизмом

У меня большой вектор фьючерсов Hyper HTTP-запросов и я хочу преобразовать их в вектор результатов. Поскольку существует ограничение на максимальное количество открытых файлов, я хочу ограничить параллелизм N фьючерсами.

Я экспериментировал с Stream::buffer_unordered но кажется, что он исполняет фьючерсы один за другим.

1 ответ

Мы использовали такой код в проекте, чтобы не открывать слишком много сокетов TCP. У этих фьючерсов есть Гипер фьючерсы внутри, так что, похоже , тот же самый случай.

// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
    futures::stream::iter(iterator_of_futures.map(Ok))
    .buffer_unordered(PARALLELISM);

// Everything after here is just using the stream in
// some manner, not directly related

let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);

// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
    match core.run(all_done) {
        Ok((None, _)) => break,
        Ok((Some(v), next_all_done)) => {
            successes.push(v);
            all_done = next_all_done.into_future();
        }
        Err((v, next_all_done)) => {
            failures.push(v);
            all_done = next_all_done.into_future();
        }
    }
}

Это используется в примере кода, поэтому цикл обработки событий (core) явно управляемый. Просмотр количества файловых дескрипторов, используемых программой, показал, что она ограничена. Кроме того, до того, как было добавлено это узкое место, мы быстро исчерпали допустимые дескрипторы файлов, тогда как впоследствии мы этого не сделали.

Другие вопросы по тегам