gpt4 book ai didi

multithreading - 是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)完成?

转载 作者:行者123 更新时间:2023-11-29 08:18:27 24 4
gpt4 key购买 nike

给定多个以 Output 值完成的线程,我如何获得生成的第一个 Output?理想情况下,同时仍然能够按照它们的生成顺序稍后获取剩余的 Output,并记住某些线程可能会或可能不会终止。

例子:

struct Output(i32);

fn main() {
let mut spawned_threads = Vec::new();

for i in 0..10 {
let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
// pretend to do some work that takes some amount of time
::std::thread::sleep(::std::time::Duration::from_millis(
(1000 - (100 * i)) as u64,
));
Output(i) // then pretend to return the `Output` of that work
});
spawned_threads.push(join_handle);
}

// I can do this to wait for each thread to finish and collect all `Output`s
let outputs_in_order_of_thread_spawning = spawned_threads
.into_iter()
.map(::std::thread::JoinHandle::join)
.collect::<Vec<::std::thread::Result<Output>>>();

// but how would I get the `Output`s in order of completed threads?
}

我可以使用共享队列/ channel /类似工具自行解决问题,但是是否有内置 API 或现有库可以更优雅地为我解决这个用例?

我正在寻找类似这样的 API:

fn race_threads<A: Send>(
threads: Vec<::std::thread::JoinHandle<A>>
) -> (::std::thread::Result<A>, Vec<::std::thread::JoinHandle<A>>) {
unimplemented!("so far this doesn't seem to exist")
}

( Rayonjoin 是我能找到的最接近的,但是 a)它只跑 2 个闭包而不是任意数量的闭包,并且 b)线程池 w/work stealing 方法不会我的用例中有一些可能永远运行的闭包。)

可以使用来自 How to check if a thread has finished in Rust? 的指针来解决这个用例就像可以使用 MPSC channel 解决这个用例一样,但是在这里我正在寻找一个干净的 API 来竞争 n 线程(或者如果失败,n 关闭 n 个线程)。

最佳答案

这些问题可以通过使用 condition variable 来解决。 :

use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug)]
struct Output(i32);

enum State {
Starting,
Joinable,
Joined,
}

fn main() {
let pair = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let mut spawned_threads = Vec::new();

let &(ref lock, ref cvar) = &*pair;
for i in 0..10 {
let my_pair = pair.clone();
let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
// pretend to do some work that takes some amount of time
::std::thread::sleep(::std::time::Duration::from_millis(
(1000 - (100 * i)) as u64,
));

let &(ref lock, ref cvar) = &*my_pair;
let mut joinable = lock.lock().unwrap();
joinable[i] = State::Joinable;
cvar.notify_one();
Output(i as i32) // then pretend to return the `Output` of that work
});
lock.lock().unwrap().push(State::Starting);
spawned_threads.push(Some(join_handle));
}

let mut should_stop = false;
while !should_stop {
let locked = lock.lock().unwrap();
let mut locked = cvar.wait(locked).unwrap();

should_stop = true;
for (i, state) in locked.iter_mut().enumerate() {
match *state {
State::Starting => {
should_stop = false;
}
State::Joinable => {
*state = State::Joined;
println!("{:?}", spawned_threads[i].take().unwrap().join());
}
State::Joined => (),
}
}
}
}

(playground link)

我并不是说这是最简单的方法。每次子线程完成时,条件变量都会唤醒主线程。该列表可以显示每个线程的状态,如果一个(即将)完成,则可以加入。

关于multithreading - 是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49846056/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com