gpt4 book ai didi

rust - 如何在不使用 Tokio 同时运行相同功能的情况下以重复间隔同时运行一组功能?

转载 作者:行者123 更新时间:2023-11-29 08:24:27 26 4
gpt4 key购买 nike

我的目标是同时运行 N 个函数,但不想在所有函数都完成之前生成更多函数。这是 what I have so far:

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");

Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));

tokio::run(task);
}

我每秒生成 5 个函数,但我现在想等到所有函数都完成后再生成更多函数。

根据我的理解(我可能理解错了),我将在另一个 future 返回一个 Future

task (Interval ----------------------+ (outer future)
for i in 0..5 { |
tokio::spawn( ----+ |
// my function | (inner) |
Ok(()) | |
) ----+ |
} |
Ok(()) --------------------------+

我一直在等待内在的 future 结束。

最佳答案

您可以通过加入您的 worker futures 来实现这一点,这样它们就可以并行运行,但必须一起完成。然后,出于相同的原因,您可以延迟 1 秒加入。将其包装到一个循环中以永远运行它(或 5 次迭代,对于演示)。

东京 1.3

use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
let now = Instant::now();
let forever = stream::unfold((), |()| async {
eprintln!("Loop starting at {:?}", Instant::now());

// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());

// Resolves when both all pages and a delay of 1 second is done
future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;

Some(((), ()))
});

forever.take(5).for_each(|_| async {}).await;
eprintln!("Took {:?}", now.elapsed());
}

fn all_pages() -> Vec<BoxFuture<'static, ()>> {
vec![page("a", 100).boxed(), page("b", 200).boxed()]
}

async fn page(name: &'static str, time_ms: u64) {
eprintln!("page {} starting", name);
time::sleep(Duration::from_millis(time_ms)).await;
eprintln!("page {} done", name);
}
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s

东京0.1

use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay}; // 0.1.18

fn main() {
let repeat_count = Some(5);

let forever = future::loop_fn(repeat_count, |repeat_count| {
eprintln!("Loop starting at {:?}", Instant::now());

// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());

// Resolves when both all pages and a delay of 1 second is done
let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

// Run all this again
wait.map(move |_| {
if let Some(0) = repeat_count {
Loop::Break(())
} else {
Loop::Continue(repeat_count.map(|c| c - 1))
}
})
});

tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
future::ok(())
.inspect(move |_| eprintln!("page {} starting", name))
.and_then(move |_| ez_delay_ms(time_ms))
.inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done

另见:

关于rust - 如何在不使用 Tokio 同时运行相同功能的情况下以重复间隔同时运行一组功能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56273037/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com