gpt4 book ai didi

rust - 如何在Rust中使用组合器(for_each除外)在不使用while循环的情况下将流运行至完成状态?

转载 作者:行者123 更新时间:2023-12-03 11:44:35 24 4
gpt4 key购买 nike

我有一个使用合并器的流,我需要运行它才能完成。我可以使用while循环或for_each组合器。它们都可以工作,但是我认为必须有更好的方法。
Sink 看起来像我想要的东西,尤其是 sink::drain() ,但是我无法理解如何使用它。
使用while循环

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
let mut stream = Box::pin(
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok())),
);

while let Some(_) = stream.next().await {
// Nothing to do here. I just need to run stream.
}
}

fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}

async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
使用 for_each:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.for_each(|_| futures::future::ready(())) // Nothing to do here, just to run stream
.await;
}

fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}

async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
我想要类似以下内容。不必完全使用 drain组合器,只需使用一些组合器即可运行流:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.forward(futures::sink::drain())
.await;
}

fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}

async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
这是行不通的,可能是因为流失对 Error类型施加了一些限制:
error[E0271]: type mismatch resolving `<futures::sink::Drain<()> as futures::Sink<()>>::Error == std::string::String`
--> src/main.rs:11:10
|
11 | .forward(futures::sink::drain())
| ^^^^^^^ expected enum `std::convert::Infallible`, found struct `std::string::String`

error[E0271]: type mismatch resolving `<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]> as futures::Stream>::Item == std::result::Result<(), std::convert::Infallible>`
--> src/main.rs:6:5
|
6 | / futures::stream::iter(0..20)
7 | | .map(foo)
8 | | .map_ok(|x| x * 10)
9 | | .and_then(bar)
10 | | .filter(|x| futures::future::ready(x.is_ok()))
11 | | .forward(futures::sink::drain())
12 | | .await;
| |______________^ expected struct `std::string::String`, found enum `std::convert::Infallible`
|
= note: expected enum `std::result::Result<_, std::string::String>`
found enum `std::result::Result<_, std::convert::Infallible>`
= note: required because of the requirements on the impl of `futures::Future` for `futures_util::stream::stream::forward::Forward<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]>, futures::sink::Drain<()>, ()>`

最佳答案

Sink特质是容易犯错的(没有TrySink),但是drain()返回一个Drain,其ErrorInfallibleStream::forward()要求流是容易出错的(实际上是TryStream),并且具有与给定接收器相同的错误类型。您的代码失败,因为您的错误类型为String,并且无法清除。
由于您正在过滤is_ok结果,因此该解决方案可以解包和重新打包值:

#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.map(|x| Ok(x.unwrap())) // <---- rewrap!
.forward(futures::sink::drain())
.await.unwrap();
}
我觉得应该有一种更简单的方法来构建 Result<_, Infallible>,但是我不知道怎么做。您可以编写 map_err(|_| panic!()),但这几乎没有更好的选择。

关于rust - 如何在Rust中使用组合器(for_each除外)在不使用while循环的情况下将流运行至完成状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64443085/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com