gpt4 book ai didi

rust - 将AsyncRead转换为TryStream字节的最佳方法是什么?

转载 作者:行者123 更新时间:2023-12-03 11:23:56 27 4
gpt4 key购买 nike

我有一个AsyncRead,想将其转换为带有tokio 0.2和Futures 0.3的Stream<Item = tokio::io::Result<Bytes>>

我能做的最好的事情是:

use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0

#[tokio::main]
async fn main() -> Result<()> {
let file = File::open("some_file.txt").await?;
let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
fn_that_takes_stream(stream)
}

fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
S: Stream<Item = Result<Bytes>>,
{
//...
Ok(())
}

似乎应该有一个更简单的方法。我很惊讶Tokio没有包含一个编解码器来获取 Bytes流而不是 BytesMut,或者不只是扩展特性提供了一种将 AsyncRead转换为 Stream的方法。我想念什么吗?

最佳答案

关于AsyncRead crate 中定义的stream::*futures-0.3,有

fn stream::TryStreamExt::into_async_read(self) -> IntoAsyncRead<Self>

但并非相反。这种差异很烦人,希望可以在 async/await生态系统的重要版本达到1.0之前解决。目前,我已经看到了几种自行完成的方法:
  • 为承载Stream的结构实现AsyncRead特性
  • 利用futures实用程序功能,例如future::poll_fn
  • OP的方法

  • IMO的第三个是最直接的。这是一些工作代码:

    //# bytes = "0.5.3"
    //# futures = "0.3.1"
    //# tokio = { version = "0.2.4", features = ["full"] }
    //# tokio-util = { version = "0.2.0", features = ["codec"] }
    use bytes::Bytes;
    use futures::stream::{self, Stream, StreamExt, TryStreamExt};
    use tokio::io::{AsyncRead, Result};
    use tokio_util::codec;

    fn into_byte_stream<R>(r: R) -> impl Stream<Item=Result<u8>>
    where
    R: AsyncRead,
    {
    codec::FramedRead::new(r, codec::BytesCodec::new())
    .map_ok(|bytes| stream::iter(bytes).map(Ok))
    .try_flatten()
    }

    fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
    where
    R: AsyncRead,
    {
    codec::FramedRead::new(r, codec::BytesCodec::new())
    .map_ok(|bytes| bytes.freeze())
    }

    #[tokio::main]
    async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    let reader = std::io::Cursor::new([114, 117, 115, 116]);
    let res = into_byte_stream(reader)
    .try_collect::<Vec<_>>()
    .await?;
    dbg!(res);

    let reader = std::io::Cursor::new([114, 117, 115, 116]);
    let res = into_bytes_stream(reader)
    .try_collect::<Vec<_>>()
    .await?;
    dbg!(res);

    Ok(())
    }

    (OP要求提供 TryStream。但是 futures-0.3具有 impl<S, T, E> TryStream for S where S: Stream<Item = Result<T, E>> + ?Sized,我们免费获得它。)

    我提交了 a ticket for futures-rs project来问为什么。事实证明,这比我最初想象的要复杂得多。 TL; DR的意思是,只有在交付通用关联类型(GAT)之后(希望在明年),我们才能令人满意地解决此问题。 Niko的 async interview #2对此进行了相当深入的讨论。

    关于rust - 将AsyncRead转换为TryStream字节的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59318460/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com