Как лучше всего преобразовать AsyncRead в TryStream байтов?

У меня есть AsyncRead и хотите преобразовать его в Stream<Item = tokio::io::Result<Bytes>> с токио 0.2 и фьючерсами 0.3.

Лучшее, что мне удалось сделать, это что-то вроде:

use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0

#[tokio::main]
async fn main() -> Result<()> {
    let file = File::open("some_file.txt").await?;
    let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
    fn_that_takes_stream(stream)
}

fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
    S: Stream<Item = Result<Bytes>>,
{
    //...
    Ok(())
}

Кажется, должен быть способ попроще; Я удивлен, что Tokio не включает кодек для получения потокаBytes вместо того BytesMut или что нет просто признака расширения, который предоставляет метод для преобразования AsyncRead в Stream. Я что-то упускаю?

2 ответа

Что касается AsyncRead а также stream::* как определено в futures-0.3 ящик, есть

fn stream::TryStreamExt::into_async_read(self) -> IntoAsyncRead<Self>

но не наоборот. Это несоответствие раздражает и, надеюсь, может быть устранено до появления важных ящиковasync/awaitэкосистема достигла 1.0. На данный момент я видел несколько способов сделать это самому:

  • осуществлять Stream черта для структуры, содержащей AsyncRead

  • использовать futures служебные функции, такие как future::poll_fn

  • Подход OP

ИМО третий - самый простой. Вот рабочий код:

//# bytes = "0.5.3"
//# futures = "0.3.1"
//# tokio = { version = "0.2.4", features = ["full"] }
//# tokio-util = { version = "0.2.0", features = ["codec"] }
use bytes::Bytes;
use futures::stream::{self, Stream, StreamExt, TryStreamExt};
use tokio::io::{AsyncRead, Result};
use tokio_util::codec;

fn into_byte_stream<R>(r: R) -> impl Stream<Item=Result<u8>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| stream::iter(bytes).map(Ok))
        .try_flatten()
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    let reader = std::io::Cursor::new([114, 117, 115, 116]);
    let res = into_byte_stream(reader)
        .try_collect::<Vec<_>>()
        .await?;
    dbg!(res);

    let reader = std::io::Cursor::new([114, 117, 115, 116]);
    let res = into_bytes_stream(reader)
        .try_collect::<Vec<_>>()
        .await?;
    dbg!(res);

    Ok(())
}

(OP попросил TryStream. Ноfutures-0.3 имеет impl<S, T, E> TryStream for S where S: Stream<Item = Result<T, E>> + ?Sized, мы получаем его бесплатно.)


Я подал заявку наfutures-rsпроект, чтобы спросить, почему. Оказывается, это намного сложнее, чем я думал изначально. TL;DR заключается в том, что только после отправки общих связанных типов (GAT), что, надеюсь, произойдет в следующем году, мы сможем удовлетворительно решить эту проблему. Асинхронное интервью № 2 Нико подробно обсуждает это.

Если вы можете использовать tokio 1.0 или 0.3, tokio-util теперь имеетtokio_util::io::ReaderStreamначиная с версии 0.4.

      let file = File::open("some_file.txt").await?;
let stream = tokio_util::io::ReaderStream::new(file);
fn_that_takes_stream(stream)
Другие вопросы по тегам