gpt4 book ai didi

rust - 为什么使用异步 block 然后使我的流取消固定?

转载 作者:行者123 更新时间:2023-12-03 11:25:58 27 4
gpt4 key购买 nike

我是 Rust 的新手,如果我使用的术语不正确,我很抱歉。也许我对问题的用词选择不正确。
我正在玩流,我需要在流元素之间有一些延迟。所以我写了这个:

use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..1000).then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
});
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}
我收到很多编译错误,但最重要的错误与固定有关。他们来了:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
--> src/main.rs:11:32
|
11 | while let Some(x) = stream.next().await {
| ^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
|
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`

error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>` cannot be unpinned
--> src/main.rs:11:25
|
11 | while let Some(x) = stream.next().await {
| ^^^^^^^^^^^^^^^^^^^ within `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:7:64: 10:6 x:_ _]>`
|
= note: required because it appears within the type `impl core::future::future::Future`
= note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
= note: required because it appears within the type `futures_util::stream::stream::then::_::__Then<'_, futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `std::marker::Unpin` for `futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>`
= note: required because of the requirements on the impl of `core::future::future::Future` for `futures_util::stream::stream::next::Next<'_, futures_util::stream::stream::then::Then<futures_util::stream::iter::Iter<std::ops::Range<{integer}>>, impl core::future::future::Future, [closure@src/main.rs:7:49: 10:6]>>`
如果我将代码更改为:
use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
let mut stream = stream::iter(0..1000).then(|x| {
futures::future::ready(x + 1)
});
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}
或者到这个:
use futures::stream;
use futures::StreamExt;
use tokio::time;

#[tokio::main]
async fn main() {
stream::iter(0..1000)
.then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
})
.for_each(|x| async move { println!("{:?}", x) })
.await;
}
它编译。
我认为这与 then 的固定和同时使用有关组合器和 while ,但我无法绕过它。

最佳答案

我认为问题归结为async块不是 Unpin .可以用这段代码证明:

fn check_unpin<F: Unpin>(_: F) { }

fn main() {
check_unpin(async {});
}
以相当神秘的消息失败:
error[E0277]: `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>` cannot be unpinned
--> src/main.rs:4:5
|
1 | fn check_unpin<F: Unpin>(_: F) { }
| ----- required by this bound in `check_unpin`
...
4 | check_unpin(async {});
| ^^^^^^^^^^^ within `impl std::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@src/main.rs:4:23: 4:25 _]>`
|
= note: required because it appears within the type `impl std::future::Future`
我认为 GenFuture是转换 async 的内部类型块成 impl Future .
现在回到您的问题,组合子 then()返回 Unpin值如果流和 Future关闭返回的是 Unpin (在 documentation 中并不完全清楚,但我从源代码中推断出这一点)。 stream::iter is Unpin ,但是当你写 |x| async move { x + 1}您正在返回 async不是 Unpin 的块,因此你的错误。
如果您使用 futures::future::ready(x + 1)它的工作原理很简单,因为 Future implements Unpin .
如果您使用 StreamExt::for_each它有效,因为它不需要 Self成为 Unpin .不是 Unpin本身,但没关系,因为您将其发送至 tokio::main在轮询之前将所有内容固定在内部。
如果你想让你的原始代码工作,你只需要手动固定你的流( playground ):
use futures::stream;
use futures::StreamExt;
use tokio::time;
use pin_utils::pin_mut;

#[tokio::main]
async fn main() {
let stream = stream::iter(0..1000).then(|x| async move {
time::delay_for(std::time::Duration::from_millis(500)).await;
x + 1
});
pin_mut!(stream); //<---- here, pinned!
while let Some(x) = stream.next().await {
println!("{:?}", x)
}
}

关于rust - 为什么使用异步 block 然后使我的流取消固定?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64005557/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com