gpt4 book ai didi

rust - rust 铸就 future 的沉沦

转载 作者:行者123 更新时间:2023-12-03 11:36:39 25 4
gpt4 key购买 nike

我想将一些读取器/写入器转换为元素的记录管道。我设法按照this answer使用futures::stream::unfold来使读者->流方向。但是,我在使用接收器->写入器时遇到了麻烦。我基本上是在寻找unfold的逆函数。
我知道有AsyncWriterExt::into_sink,但这仅在我可以生成所有字节以成批写入的情况下才有效。我还发现this answer建议在drain()之后使用with()。但是由于存在生命周期问题,此方法不起作用(FnMut无法有效地存储对writer的引用,或者至少我没有做到这一点。
因此,我正在寻找的是一些我可以调用的函数,例如fold(initial_state, |element| {(writer.write(element).await, new_state)})。您明白了(我希望)。
我也看到有async_codec,但对我来说似乎有点过头了。同时,我求助于将所有写入存储为流,然后使用writer.into_sink().with_flat_map()。但这真的很丑。

最佳答案

编辑:我显然不是唯一想要这样做的人,请参阅upstream implementation。 future 的用户(呵呵)将可以简单地使用futures::sink::unfold

好的,我鼓起勇气,根据futures::stream::unfold一起砍了一些东西:

fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
FoldSink {
f,
state: Some(init),
fut: None,
}
}


use pin_project::pin_project;

#[pin_project]
struct FoldSink<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}

impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
type Error = E;

fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
let mut this = self.project();

match this.fut.as_mut().as_pin_mut() {
Some(fut) => {
match fut.poll(ctx) {
Poll::Ready(Ok(new_state)) => {
this.fut.set(None);
*this.state = Some(new_state);
Poll::Ready(Ok(()))
},
Poll::Ready(Err(e)) => {
this.fut.set(None);
Poll::Ready(Err(e))
},
Poll::Pending => Poll::Pending,
}
},
None => {
Poll::Ready(Ok(()))
}
}
}

fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
let mut this = self.project();
this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
Ok(())
}

fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
self.poll_ready(ctx)
}

fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
let this = self.project();
this.state.take().unwrap();
Poll::Ready(Ok(()))
}
}
欢迎评论和改进!

关于rust - rust 铸就 future 的沉沦,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65044743/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com