Как мне записать futures::Stream на диск, не сохраняя его полностью в памяти?

Вот пример загрузки файла с помощью Rusoto S3 здесь: Как сохранить файл, загруженный с S3 с помощью Rusoto, на мой жесткий диск?

Проблема в том, что, похоже, он загружает весь файл в память, а затем записывает его на диск, потому что он использует write_all метод, который принимает массив байтов, а не поток. Как я могу использовать StreamingBody, который реализует futures::Stream для потоковой передачи файла на диск?

1 ответ

Решение

Поскольку StreamingBody инвентарь Stream<Item = Vec<u8>, Error = Error> мы можем построить MCVE, который представляет, что:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
    const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
    let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
    stream::iter_ok(iter_of_owned_bytes)
}

Затем мы можем как-то получить "потоковое тело" и использовать Stream::for_each обрабатывать каждый элемент в Stream, Здесь мы просто называем write_all с некоторым предоставленным выходным местоположением:

use std::{fs::File, io::Write};

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
    streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}

Затем мы можем написать небольшое тестирование:

fn main() {
    let mut file = Vec::new();

    {
        let fut = save_to_disk(&mut file);
        fut.wait().expect("Could not drive future");
    }

    assert_eq!(file, b"0123456789ABCDEF");
}

Важные замечания о качестве этой наивной реализации:

  1. Призыв к write_all потенциально может заблокировать, что вы не должны делать в асинхронной программе. Было бы лучше передать эту блокирующую работу пулу потоков.

  2. Использование Future::wait принудительно блокирует поток до тех пор, пока не будет создано будущее, что отлично подходит для тестов, но может не подходить для вашего реального варианта использования.

Смотрите также:

Другие вопросы по тегам