作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的目标是同时运行 N 个函数,但不想在所有函数都完成之前生成更多函数。这是 what I have so far:
extern crate tokio;
extern crate futures;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("Hello from task {}", i);
// mock delay (something blocking)
// thread::sleep(time::Duration::from_secs(3));
Command::new("sleep").arg("3").output().expect("failed to execute process");
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
我每秒生成 5 个函数,但我现在想等到所有函数都完成后再生成更多函数。
根据我的理解(我可能理解错了),我将在另一个 future 返回一个 Future
task (Interval ----------------------+ (outer future)
for i in 0..5 { |
tokio::spawn( ----+ |
// my function | (inner) |
Ok(()) | |
) ----+ |
} |
Ok(()) --------------------------+
我一直在等待内在的 future 结束。
最佳答案
您可以通过加入您的 worker futures 来实现这一点,这样它们就可以并行运行,但必须一起完成。然后,出于相同的原因,您可以延迟 1 秒加入。将其包装到一个循环中以永远运行它(或 5 次迭代,对于演示)。
use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0
#[tokio::main]
async fn main() {
let now = Instant::now();
let forever = stream::unfold((), |()| async {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
Some(((), ()))
});
forever.take(5).for_each(|_| async {}).await;
eprintln!("Took {:?}", now.elapsed());
}
fn all_pages() -> Vec<BoxFuture<'static, ()>> {
vec![page("a", 100).boxed(), page("b", 200).boxed()]
}
async fn page(name: &'static str, time_ms: u64) {
eprintln!("page {} starting", name);
time::sleep(Duration::from_millis(time_ms)).await;
eprintln!("page {} done", name);
}
Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s
use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay}; // 0.1.18
fn main() {
let repeat_count = Some(5);
let forever = future::loop_fn(repeat_count, |repeat_count| {
eprintln!("Loop starting at {:?}", Instant::now());
// Resolves when all pages are done
let batch_of_pages = future::join_all(all_pages());
// Resolves when both all pages and a delay of 1 second is done
let wait = Future::join(batch_of_pages, ez_delay_ms(1000));
// Run all this again
wait.map(move |_| {
if let Some(0) = repeat_count {
Loop::Break(())
} else {
Loop::Continue(repeat_count.map(|c| c - 1))
}
})
});
tokio::run(forever.map_err(drop));
}
fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}
fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
future::ok(())
.inspect(move |_| eprintln!("page {} starting", name))
.and_then(move |_| ez_delay_ms(time_ms))
.inspect(move |_| eprintln!("page {} done", name))
}
fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}
Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done
另见:
关于rust - 如何在不使用 Tokio 同时运行相同功能的情况下以重复间隔同时运行一组功能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56273037/
我是一名优秀的程序员,十分优秀!