作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
给定多个以 Output
值完成的线程,我如何获得生成的第一个 Output
?理想情况下,同时仍然能够按照它们的生成顺序稍后获取剩余的 Output
,并记住某些线程可能会或可能不会终止。
例子:
struct Output(i32);
fn main() {
let mut spawned_threads = Vec::new();
for i in 0..10 {
let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
// pretend to do some work that takes some amount of time
::std::thread::sleep(::std::time::Duration::from_millis(
(1000 - (100 * i)) as u64,
));
Output(i) // then pretend to return the `Output` of that work
});
spawned_threads.push(join_handle);
}
// I can do this to wait for each thread to finish and collect all `Output`s
let outputs_in_order_of_thread_spawning = spawned_threads
.into_iter()
.map(::std::thread::JoinHandle::join)
.collect::<Vec<::std::thread::Result<Output>>>();
// but how would I get the `Output`s in order of completed threads?
}
我可以使用共享队列/ channel /类似工具自行解决问题,但是是否有内置 API 或现有库可以更优雅地为我解决这个用例?
我正在寻找类似这样的 API:
fn race_threads<A: Send>(
threads: Vec<::std::thread::JoinHandle<A>>
) -> (::std::thread::Result<A>, Vec<::std::thread::JoinHandle<A>>) {
unimplemented!("so far this doesn't seem to exist")
}
( Rayon 的 join
是我能找到的最接近的,但是 a)它只跑 2 个闭包而不是任意数量的闭包,并且 b)线程池 w/work stealing 方法不会我的用例中有一些可能永远运行的闭包。)
可以使用来自 How to check if a thread has finished in Rust? 的指针来解决这个用例就像可以使用 MPSC channel 解决这个用例一样,但是在这里我正在寻找一个干净的 API 来竞争 n
线程(或者如果失败,n
关闭 n
个线程)。
最佳答案
这些问题可以通过使用 condition variable 来解决。 :
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug)]
struct Output(i32);
enum State {
Starting,
Joinable,
Joined,
}
fn main() {
let pair = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let mut spawned_threads = Vec::new();
let &(ref lock, ref cvar) = &*pair;
for i in 0..10 {
let my_pair = pair.clone();
let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
// pretend to do some work that takes some amount of time
::std::thread::sleep(::std::time::Duration::from_millis(
(1000 - (100 * i)) as u64,
));
let &(ref lock, ref cvar) = &*my_pair;
let mut joinable = lock.lock().unwrap();
joinable[i] = State::Joinable;
cvar.notify_one();
Output(i as i32) // then pretend to return the `Output` of that work
});
lock.lock().unwrap().push(State::Starting);
spawned_threads.push(Some(join_handle));
}
let mut should_stop = false;
while !should_stop {
let locked = lock.lock().unwrap();
let mut locked = cvar.wait(locked).unwrap();
should_stop = true;
for (i, state) in locked.iter_mut().enumerate() {
match *state {
State::Starting => {
should_stop = false;
}
State::Joinable => {
*state = State::Joined;
println!("{:?}", spawned_threads[i].take().unwrap().join());
}
State::Joined => (),
}
}
}
}
我并不是说这是最简单的方法。每次子线程完成时,条件变量都会唤醒主线程。该列表可以显示每个线程的状态,如果一个(即将)完成,则可以加入。
关于multithreading - 是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49846056/
我是一名优秀的程序员,十分优秀!