gpt4 book ai didi

multithreading - 运行生成线程的可中断 Rust 程序

转载 作者:行者123 更新时间:2023-11-29 08:19:04 25 4
gpt4 key购买 nike

我正在尝试编写一个生成一堆线程然后在最后加入线程的程序。我希望它是可中断的,因为我的计划是让它成为一个在 UNIX 服务中持续运行的程序。

想法是 worker_pool 将包含所有已生成的线程,因此可以随时调用 terminate 来收集它们。

我似乎无法找到一种方法来利用 chan_select crate 来执行此操作,因为这需要我首先生成一个线程来生成我的子线程,一旦我这样做,我就不能再使用 worker_pool 在中断时加入线程时的变量,因为它必须被移出主循环。如果您注释掉中断中终止 worker 的行,它会编译。

我有点沮丧,因为这在 C 中真的很容易做到。我可以设置一个静态指针,但是当我在 Rust 中尝试这样做时,我得到了一个错误,因为我正在为我的线程,我无法在静态中初始化为空向量。我知道在中断代码中加入 worker 是安全的,因为执行在这里停止等待信号。

也许有更好的信号处理方法,或者我错过了一些我可以做的事情。

错误和代码如下:

MacBook8088:video_ingest pjohnson$ cargo run
Compiling video_ingest v0.1.0 (file:///Users/pjohnson/projects/video_ingest)
error[E0382]: use of moved value: `worker_pool`
--> src/main.rs:30:13
|
24 | thread::spawn(move || run(sdone, &mut worker_pool));
| ------- value moved (into closure) here
...
30 | worker_pool.terminate();
| ^^^^^^^^^^^ value used here after move
<chan macros>:42:47: 43:23 note: in this expansion of chan_select! (defined in <chan macros>)
src/main.rs:27:5: 35:6 note: in this expansion of chan_select! (defined in <chan macros>)
|
= note: move occurs because `worker_pool` has type `video_ingest::WorkerPool`, which does not implement the `Copy` trait

主要.rs

#[macro_use]
extern crate chan;
extern crate chan_signal;
extern crate video_ingest;

use chan_signal::Signal;
use video_ingest::WorkerPool;
use std::thread;
use std::ptr;

///
/// Starts processing
///
fn main() {
let mut worker_pool = WorkerPool { join_handles: vec![] };

// Signal gets a value when the OS sent a INT or TERM signal.
let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

// When our work is complete, send a sentinel value on `sdone`.
let (sdone, rdone) = chan::sync(0);

// Run work.
thread::spawn(move || run(sdone, &mut worker_pool));

// Wait for a signal or for work to be done.
chan_select! {
signal.recv() -> signal => {
println!("received signal: {:?}", signal);
worker_pool.terminate(); // <-- Comment out to compile
},
rdone.recv() => {
println!("Program completed normally.");
}
}
}

fn run(sdone: chan::Sender<()>, worker_pool: &mut WorkerPool) {
loop {
worker_pool.ingest();
worker_pool.terminate();
}
}

库.rs

extern crate libc;

use std::thread;
use std::thread::JoinHandle;
use std::os::unix::thread::JoinHandleExt;
use libc::pthread_join;
use libc::c_void;
use std::ptr;
use std::time::Duration;

pub struct WorkerPool {
pub join_handles: Vec<JoinHandle<()>>
}

impl WorkerPool {

///
/// Does the actual ingestion
///
pub fn ingest(&mut self) {

// Use 9 threads for an example.
for i in 0..10 {
self.join_handles.push(
thread::spawn(move || {

// Get the videos
println!("Getting videos for thread {}", i);
thread::sleep(Duration::new(5, 0));
})
);
}
}

///
/// Joins all threads
///
pub fn terminate(&mut self) {
println!("Total handles: {}", self.join_handles.len());

for handle in &self.join_handles {
println!("Joining thread...");

unsafe {
let mut state_ptr: *mut *mut c_void = 0 as *mut *mut c_void;
pthread_join(handle.as_pthread_t(), state_ptr);
}
}

self.join_handles = vec![];
}
}

最佳答案

terminate can be called at any time to collect them.

I don't want to stop the threads; I want to collect them with join. I agree stopping them would not be a good idea.

这两个陈述对我来说没有意义。您只能在完成时加入一个线程。 “可中断”和“随时”这两个词意味着您可以在线程仍在进行某些处理时尝试停止该线程。您想要哪种行为?

如果您希望能够停止已部分完成的线程,则必须增强您的代码以检查它是否应该提前退出。这通常很复杂,因为您正在进行一些您无法控制的大型计算。理想情况下,您将其分解成 block 并经常检查退出标志。例如,对于视频作品,您可以检查每一帧。那么响应延迟大概就是处理一帧的时间。

this would be really easy to do in C.

这真的很容易做错。例如,当前提供的代码尝试在没有任何同步的情况下从两个不同的线程对池执行突变。这是制作损坏的、难以调试的代码的必经之路。

// Use 9 threads for an example.

0..10 创建 10 个线程。


无论如何,似乎缺少的知识是ArcMutex . Arc 允许在线程之间共享单个项目的所有权,而 Mutex 允许在线程之间进行运行时可变借用。

#[macro_use]
extern crate chan;
extern crate chan_signal;

use chan_signal::Signal;
use std::thread::{self, JoinHandle};
use std::sync::{Arc, Mutex};

fn main() {
let worker_pool = Arc::new(Mutex::new(WorkerPool::new()));

let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

let (work_done_tx, work_done_rx) = chan::sync(0);

let worker_pool_clone = worker_pool.clone();
thread::spawn(move || run(work_done_tx, worker_pool_clone));

// Wait for a signal or for work to be done.
chan_select! {
signal.recv() -> signal => {
println!("received signal: {:?}", signal);
let mut pool = worker_pool.lock().expect("Unable to lock the pool");
pool.terminate();
},
work_done_rx.recv() => {
println!("Program completed normally.");
}
}
}

fn run(_work_done_tx: chan::Sender<()>, worker_pool: Arc<Mutex<WorkerPool>>) {
loop {
let mut worker_pool = worker_pool.lock().expect("Unable to lock the pool");
worker_pool.ingest();
worker_pool.terminate();
}
}

pub struct WorkerPool {
join_handles: Vec<JoinHandle<()>>,
}

impl WorkerPool {
pub fn new() -> Self {
WorkerPool {
join_handles: vec![],
}
}

pub fn ingest(&mut self) {
self.join_handles.extend(
(0..10).map(|i| {
thread::spawn(move || {
println!("Getting videos for thread {}", i);
})
})
)
}

pub fn terminate(&mut self) {
for handle in self.join_handles.drain(..) {
handle.join().expect("Unable to join thread")
}
}
}

注意程序逻辑本身还是很差;即使发送了中断,run 中的loop 仍会继续执行。主线程会锁定互斥量,加入所有当前线程1,解锁互斥量并退出程序。但是,循环可以在主线程退出并开始处理一些新数据之前锁定互斥量!然后程序在处理过程中退出。这几乎与您根本不处理中断一样。

1:哈哈,骗你! 此时没有正在运行的线程。由于互斥量在整个循环 中都被锁定,因此唯一可以进行另一次锁定的时间是在循环重置时。但是,由于循环中的最后一条指令是加入所有线程,因此不会再运行。

I don't want to let the program terminate before all threads have completed.

也许这是减少问题的产物,但我看不出无限循环是如何退出的,所以“我完成了” channel 似乎是多余的。

我可能只是添加一个标志,在收到中断时说“请停止”。然后我会检查它而不是无限循环并等待正在运行的线程在退出程序之前完成。

use std::sync::atomic::{AtomicBool, Ordering};

fn main() {
let worker_pool = WorkerPool::new();

let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);
let please_stop = Arc::new(AtomicBool::new(false));

let threads_please_stop = please_stop.clone();
let runner = thread::spawn(|| run(threads_please_stop, worker_pool));

// Wait for a signal
chan_select! {
signal.recv() -> signal => {
println!("received signal: {:?}", signal);
please_stop.store(true, Ordering::SeqCst);
},
}

runner.join().expect("Unable to join runner thread");
}

fn run(please_stop: Arc<AtomicBool>, mut worker_pool: WorkerPool) {
while !please_stop.load(Ordering::SeqCst) {
worker_pool.ingest();
worker_pool.terminate();
}
}

关于multithreading - 运行生成线程的可中断 Rust 程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40027334/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com