Потоковая загрузка в s3 с помощью rusoto
Как с помощью rusoto закачать файл в s3, не читая содержимое файла в память (потоковое)?
С этим кодом:
use std::fs::File;
use std::io::BufReader;
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};
fn main() {
let file = File::open("input.txt").unwrap();
let mut reader = BufReader::new(file);
let s3_client = S3Client::new(Region::UsEast1);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("example_bucket"),
key: "example_filename".to_string(),
// this works:
// body: Some("example string".to_owned().into_bytes().into()),
// this doesn't:
body: Some(StreamingBody::new(reader)),
..Default::default()
}).sync().expect("could not upload");
}
Я получаю следующую ошибку:
error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied --> src/bin/example.rs:18:20 | 18 | body: Some(StreamingBody::new(reader)), | ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>` | = note: required by `rusoto_core::stream::ByteStream::new`
2 ответа
Хорошо. Пристегните себя, это весело.
StreamingBody
это псевдоним для ByteStream
, который сам принимает тип параметра S: Stream<Item = Bytes, Error = Error> + Send + 'static
. Короче говоря, это должен быть поток байтов.
BufReader
, очевидно, не реализует эту черту, так как она намного предшествует фьючерсам и потокам. Также нет простого преобразования вStream<Item = Bytes>
который вы можете использовать для неявного преобразования в this.
Причина, по которой первый (прокомментированный) пример работает, заключается в том, что String::into_bytes().into()
будет следовать цепочке приведения типов: String
-> Vec<u8>
-> ByteStream
благодаря реализации From<Vec<u8>>
на ByteStream
.
Теперь, когда мы знаем, почему это не работает, мы можем это исправить. Есть быстрый путь, и есть правильный путь. Я покажу вам обоих.
Быстрый путь
Быстрый (но не оптимальный) способ - просто позвонить File::read_to_end()
. Это заполнитVec<u8>
, который затем можно использовать, как и раньше:
let mut buf:Vec<u8> = vec![];
file.read_to_end(&mut buf)?;
// buf now contains the entire file
Это неэффективно и неоптимально по двум причинам:
read_to_end()
это блокирующий звонок. В зависимости от того, откуда вы читаете файл, это время блокировки может оказаться необоснованным.- У вас должно быть больше свободной оперативной памяти, чем байтов в вашем файле (+ 64 или 128 бит для
Vec
определение + немного лишнего, о котором мы особо не заботимся)
Хороший способ
Хороший способ превратить ваш файл в структуру, реализующую AsyncRead
. Отсюда мы можем сформироватьStream
.
Поскольку у вас уже есть std::fs::File
, сначала преобразуем его в tokio::fs::File
. Это реализуетAsyncRead
, что очень важно для дальнейшего:
let tokio_file = tokio::fs::File::from_std(file);
К сожалению, нам нужно проделать некоторую конвейерную работу, чтобы поместить его в Stream
. Это реализовано в нескольких ящиках; способ сделать это с нуля следующий:
use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
.map(|r| r.as_ref().to_vec());
byte_stream
это пример tokio_util::codec::FramedRead
который реализуетStream
с конкретным элементом на основе нашего декодера. Поскольку наш декодер BytesCodec
, значит, ваш поток Stream<Item = BytesMut>
.
Поскольку детская площадка не знает rusoto_core
, Я не могу показать вам полный поток. Однако я могу показать вам, что вы можете создатьStream<Item = Vec<u8>, Error = io::Error>
, в чем суть этого: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133
Вот версия с предстоящим синтаксисом Rusoto async-await (для getObject, хотя должно быть легко настроить для загрузки)... возможно, для публичного использования в Rusoto 0.4.3:
https://github.com/brainstorm/rusoto-s3-async-await
А именно:
pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
let get_req = GetObjectRequest {
bucket,
key: object,
..Default::default()
};
let result = client
.get_object(get_req)
.await
.expect("Couldn't GET object");
println!("get object result: {:#?}", result);
let stream = result.body.unwrap();
let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();
assert!(body.len() > 0);
dbg!(body);
}
Что по сути заимствовано из самого пакета интеграционных тестов, где вы также можете найти фрагменты загружаемой версии.