Как преобразовать futures_io :: AsyncRead в rusoto :: ByteStream?

Я пытаюсь создать службу, которая извлекает файлы с сервера SFTP и загружает их в S3.

Для части SFTP я использую async-ssh2, который дает мне обработчик файлов, реализующий futures::AsyncRead. Поскольку эти файлы SFTP могут быть довольно большими, я пытаюсь изменить этоFile обработчик в ByteStream которые я могу загрузить с помощью Rusoto. Похоже наByteStream может быть инициализирован с помощью futures::Stream.

Мой план состоял в том, чтобы реализовать Stream на Fileобъект (на основе кода здесь) для совместимости с Rusoto (код воспроизводится ниже для потомков):

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

Будет ли это хороший способ сделать это? Я видел этот вопрос, но, похоже, он использует tokio::io::AsyncRead. Используетtokioканонический способ сделать это? Если да, то есть ли способ конвертировать изfutures_io::AsyncRead к tokio::io::AsyncRead?

1 ответ

Решение

Именно так я и сделал преобразование. Я основал это на приведенном выше коде, за исключением того, что я использовал больший буфер (8 КБ), чтобы уменьшить количество сетевых вызовов.

use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;

const KB: usize = 1024;

struct ByteStream<R>(R);

impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = Result<Bytes, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0_u8; 8 * KB];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
            Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
            Ok(_) => None.into(),
            Err(e) => Some(Err(e)).into(),
        }
    }
}

Позволив мне сделать это:

fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
    let stream = ByteStream(body);
    Some(StreamingBody::new(stream))
}

(Обратите внимание, что rusoto::StreamingBody а также rusoto::ByteStream псевдонимы)

Другие вопросы по тегам