gpt4 book ai didi

rust - 使用 Tokio 0.2 生成非静态 future

转载 作者:行者123 更新时间:2023-12-03 11:33:43 25 4
gpt4 key购买 nike

我有一个异步方法应该并行执行一些 future ,并且只有在所有 future 完成后才返回。但是,它通过引用传递了一些数据,这些数据的存在时间没有 'static 长(它将在 main 方法中的某个时刻被删除)。从概念上讲,它类似于( Playground ):

async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}

#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
现在,tokio 希望传递给 spawn 的 future 在 'static 生命周期内有效,因为我可以在不停止 future 的情况下删除句柄。这意味着我上面的示例会产生此错误消息:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
| ^^^^^ ------ this data with an anonymous lifetime `'_`...
| |
| ...is captured here...
...
15 | let task = spawn(do_sth(i));
| ----- ...and is required to live as long as `'static` here
所以我的问题是:如何生成仅对当前上下文有效的 future ,然后我可以等到所有 future 都完成?
(如果这在 tokio 0.3 上可行但在 0.2 上不可行,我仍然感兴趣,尽管目前这会涉及很多 git 依赖项)

最佳答案

不可能从异步 Rust 生成非 'static future 。这是因为任何异步函数都可能在任何时候被取消,因此无法保证调用者真的比生成的任务更长寿。
确实有各种 crate 允许异步任务的范围生成,但是这些 crate 不能从异步代码中使用。它们允许从非异步代码生成范围内的异步任务。这并不违反上面的问题,因为产生它们的非异步代码在任何时候都不能取消,因为它不是异步的。
一般有两种方法可以解决这个问题:

  • 使用 'static 而不是普通引用生成 Arc 任务。
  • 使用 futures crate 中的并发原语而不是 spawning。

  • 请注意,此答案适用于 Tokio 0.2.x0.3.x

    通常要生成静态任务并使用 Arc ,您必须拥有相关值的所有权。这意味着由于您的函数通过引用获取参数,因此您不能在不克隆数据的情况下使用此技术。
    async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
    }

    async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);

    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
    // Cloning an Arc does not clone the data.
    let shared_clone = shared.clone();
    let task = spawn(do_sth(shared_clone, i));
    tasks.push(task);
    }
    for task in tasks {
    task.await;
    }
    }
    请注意,如果您有对数据的可变引用,并且数据是 Sized ,即不是切片,则可以临时获得它的所有权。
    async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
    }

    async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);

    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
    // Cloning an Arc does not clone the data.
    let shared_clone = shared.clone();
    let task = spawn(do_sth(shared_clone, i));
    tasks.push(task);
    }
    for task in tasks {
    task.await;
    }

    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
    }

    另一种选择是使用 futures crate 中的并发原语。这些具有使用非 'static 数据的优点,但缺点是任务无法同时在多个线程上运行。
    对于许多工作流来说,这完全没问题,因为无论如何异步代码都应该花费大部分时间等待 IO。
    一种方法是使用 FuturesUnordered 。这是一个特殊的集合,可以存储许多不同的 future ,它有一个 next 函数,可以同时运行所有 future ,并在第一个完成后返回。 ( next 函数只有在导入 StreamExt 时才可用)
    你可以这样使用它:
    use futures::stream::{FuturesUnordered, StreamExt};

    async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
    }

    async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
    let task = do_sth(i);
    tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
    }
    注意: FuturesUnordered 必须定义在共享值之后。否则你会得到一个借用错误,这是由于它们以错误的顺序被删除。

    另一种方法是使用 Stream 。对于流,您可以使用 buffer_unordered 。这是一个在内部使用 FuturesUnordered 的实用程序。
    use futures::stream::StreamExt;

    async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
    }

    async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
    .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
    .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
    .for_each(|()| async {})
    .await;
    }
    请注意,在这两种情况下,导入 StreamExt 都很重要,因为它提供了在不导入扩展特征的情况下无法在流上使用的各种方法。

    关于rust - 使用 Tokio 0.2 生成非静态 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65269738/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com