Потоковая загрузка в s3 с помощью rusoto

Как с помощью rusoto закачать файл в s3, не читая содержимое файла в память (потоковое)?


С этим кодом:

use std::fs::File;
use std::io::BufReader;

use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};

fn main() {
    let file = File::open("input.txt").unwrap();
    let mut reader = BufReader::new(file);

    let s3_client = S3Client::new(Region::UsEast1);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("example_bucket"),
        key: "example_filename".to_string(),
//        this works:
//      body: Some("example string".to_owned().into_bytes().into()),
//        this doesn't:
        body: Some(StreamingBody::new(reader)),
        ..Default::default()
    }).sync().expect("could not upload");
}

Я получаю следующую ошибку:

error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied
  --> src/bin/example.rs:18:20
   |
18 |         body: Some(StreamingBody::new(reader)),
   |                    ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>`
   |
   = note: required by `rusoto_core::stream::ByteStream::new`

2 ответа

Решение

Хорошо. Пристегните себя, это весело.

StreamingBody это псевдоним для ByteStream, который сам принимает тип параметра S: Stream<Item = Bytes, Error = Error> + Send + 'static. Короче говоря, это должен быть поток байтов.

BufReader, очевидно, не реализует эту черту, так как она намного предшествует фьючерсам и потокам. Также нет простого преобразования вStream<Item = Bytes> который вы можете использовать для неявного преобразования в this.

Причина, по которой первый (прокомментированный) пример работает, заключается в том, что String::into_bytes().into() будет следовать цепочке приведения типов: String -> Vec<u8> -> ByteStream благодаря реализации From<Vec<u8>> на ByteStream.

Теперь, когда мы знаем, почему это не работает, мы можем это исправить. Есть быстрый путь, и есть правильный путь. Я покажу вам обоих.

Быстрый путь

Быстрый (но не оптимальный) способ - просто позвонить File::read_to_end(). Это заполнитVec<u8>, который затем можно использовать, как и раньше:

 let mut buf:Vec<u8> = vec![];
 file.read_to_end(&mut buf)?;
 // buf now contains the entire file

Это неэффективно и неоптимально по двум причинам:

  • read_to_end()это блокирующий звонок. В зависимости от того, откуда вы читаете файл, это время блокировки может оказаться необоснованным.
  • У вас должно быть больше свободной оперативной памяти, чем байтов в вашем файле (+ 64 или 128 бит для Vec определение + немного лишнего, о котором мы особо не заботимся)

Хороший способ

Хороший способ превратить ваш файл в структуру, реализующую AsyncRead. Отсюда мы можем сформироватьStream.

Поскольку у вас уже есть std::fs::File, сначала преобразуем его в tokio::fs::File. Это реализуетAsyncRead, что очень важно для дальнейшего:

let tokio_file = tokio::fs::File::from_std(file);

К сожалению, нам нужно проделать некоторую конвейерную работу, чтобы поместить его в Stream. Это реализовано в нескольких ящиках; способ сделать это с нуля следующий:

use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
   .map(|r| r.as_ref().to_vec());

byte_stream это пример tokio_util::codec::FramedReadкоторый реализуетStream с конкретным элементом на основе нашего декодера. Поскольку наш декодер BytesCodec, значит, ваш поток Stream<Item = BytesMut>.

Поскольку детская площадка не знает rusoto_core, Я не могу показать вам полный поток. Однако я могу показать вам, что вы можете создатьStream<Item = Vec<u8>, Error = io::Error>, в чем суть этого: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133

Вот версия с предстоящим синтаксисом Rusoto async-await (для getObject, хотя должно быть легко настроить для загрузки)... возможно, для публичного использования в Rusoto 0.4.3:

https://github.com/brainstorm/rusoto-s3-async-await

А именно:

pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
    let get_req = GetObjectRequest {
        bucket,
        key: object,
        ..Default::default()
    };

    let result = client
        .get_object(get_req)
        .await
        .expect("Couldn't GET object");
    println!("get object result: {:#?}", result);

    let stream = result.body.unwrap();
    let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();

    assert!(body.len() > 0);
    dbg!(body);
}

Что по сути заимствовано из самого пакета интеграционных тестов, где вы также можете найти фрагменты загружаемой версии.

Другие вопросы по тегам