gpt4 book ai didi

multithreading - 多线程 : what am I missing? 随着时间的推移性能严重下降

转载 作者:行者123 更新时间:2023-12-04 11:48:19 26 4
gpt4 key购买 nike

在我的应用程序中,一个方法一旦启动就运行得很快,但在接近完成时性能开始不断下降,这似乎与工作量(每个线程必须执行的函数的迭代次数)无关。一旦接近尾声,与之前相比,它的速度会减慢到令人难以置信的慢(值得注意的是,这不仅仅是由于较少的线程未完成,似乎每个线程都变慢了)。
我无法弄清楚为什么会发生这种情况,所以我在问。我究竟做错了什么?

CPU使用率概览:
问题的幻灯片
值得注意的是,CPU 温度始终保持在较低水平。
这个阶段随着设置的工作量而变化,更多的工作产生更好的外观,所有线程始终接近 100%。不过,在这一刻,这看起来不错。
enter image description here
在这里我们看到了之前的持续表现,
enter image description here
在这里,我们看到它开始退化。我不知道为什么会发生这种情况。
enter image description here
经过一段时间的困惑后,大多数线程完成了它们的工作,其余线程继续运行,此时虽然看起来它们已达到 100%,但实际上它们执行剩余工作量的速度非常缓慢。我不明白为什么会发生这种情况。
enter image description here

打印进度
我写了一个多线程 random_search ( documentation link ) 函数进行优化。此函数的大部分复杂性来自打印数据在线程之间传递数据,这支持提供显示进度的输出,例如:

2300
565 (24.57%) 00:00:11 / 00:00:47 [25.600657363049734] { [563.0ns, 561.3ms, 125.0ns, 110.0ns] [2.0µs, 361.8ms, 374.0ns, 405.0ns] [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
我一直在尝试使用此输出来找出出了什么问题,但我不知道。
此输出描述:
  • 总迭代次数2300 .
  • 当前迭代总数565 .
  • 运行时间00:00:11 ( mm:ss:ms )。
  • 估计剩余时间 00:00:47 ( mm:ss:ms )。
  • 当前最佳值[25.600657363049734] .
  • 执行位置之间最近测量的时间(线程从某行到另一行所需的有效时间(在下面的代码中用 update_execution_position 专门定义)[563.0ns, 561.3ms, 125.0ns, 110.0ns] .
  • 执行位置之间的平均时间(这是整个运行时的平均值,而不是自上次测量以来的平均值)[2.0µs, 361.8ms, 374.0ns, 405.0ns] .
  • 线程的执行位置(0 是当一个线程完成时,rest 表示一个线程已经命中了某行,触发了这个设置,但还没有命中改变它的下一行,实际上是在 2 个位置之间)[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
  • random_search代码:
    鉴于我已经使用库中的其他方法测试了实现 grid_search simulated_annealing 它会向我表明问题至少不完全存在于 random_search.rs . random_search.rs :
    pub fn random_search<
    A: 'static + Send + Sync,
    T: 'static + Copy + Send + Sync + Default + SampleUniform + PartialOrd,
    const N: usize,
    >(
    // Generics
    ranges: [Range<T>; N],
    f: fn(&[T; N], Option<Arc<A>>) -> f64,
    evaluation_data: Option<Arc<A>>,
    polling: Option<Polling>,
    // Specifics
    iterations: u64,
    ) -> [T; N] {
    // Gets cpu data
    let cpus = num_cpus::get() as u64;
    let search_cpus = cpus - 1; // 1 cpu is used for polling, this one.

    let remainder = iterations % search_cpus;
    let per = iterations / search_cpus;

    let ranges_arc = Arc::new(ranges);

    let (best_value, best_params) = search(
    // Generics
    ranges_arc.clone(),
    f,
    evaluation_data.clone(),
    // Since we are doing this on the same thread, we don't need to use these
    Arc::new(AtomicU64::new(Default::default())),
    Arc::new(Mutex::new(Default::default())),
    Arc::new(AtomicBool::new(false)),
    Arc::new(AtomicU8::new(0)),
    Arc::new([
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    ]),
    // Specifics
    remainder,
    );

    let thread_exit = Arc::new(AtomicBool::new(false));
    // (handles,(counters,thread_bests))
    let (handles, links): (Vec<_>, Vec<_>) = (0..search_cpus)
    .map(|_| {
    let ranges_clone = ranges_arc.clone();
    let counter = Arc::new(AtomicU64::new(0));
    let thread_best = Arc::new(Mutex::new(f64::MAX));
    let thread_execution_position = Arc::new(AtomicU8::new(0));
    let thread_execution_time = Arc::new([
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    Mutex::new((Duration::new(0, 0), 0)),
    ]);

    let counter_clone = counter.clone();
    let thread_best_clone = thread_best.clone();
    let thread_exit_clone = thread_exit.clone();
    let evaluation_data_clone = evaluation_data.clone();
    let thread_execution_position_clone = thread_execution_position.clone();
    let thread_execution_time_clone = thread_execution_time.clone();
    (
    thread::spawn(move || {
    search(
    // Generics
    ranges_clone,
    f,
    evaluation_data_clone,
    counter_clone,
    thread_best_clone,
    thread_exit_clone,
    thread_execution_position_clone,
    thread_execution_time_clone,
    // Specifics
    per,
    )
    }),
    (
    counter,
    (
    thread_best,
    (thread_execution_position, thread_execution_time),
    ),
    ),
    )
    })
    .unzip();
    let (counters, links): (Vec<Arc<AtomicU64>>, Vec<_>) = links.into_iter().unzip();
    let (thread_bests, links): (Vec<Arc<Mutex<f64>>>, Vec<_>) = links.into_iter().unzip();
    let (thread_execution_positions, thread_execution_times) = links.into_iter().unzip();

    if let Some(poll_data) = polling {
    poll(
    poll_data,
    counters,
    remainder,
    iterations,
    thread_bests,
    thread_exit,
    thread_execution_positions,
    thread_execution_times,
    );
    }

    let joins: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();

    let (_, best_params) = joins
    .into_iter()
    .fold((best_value, best_params), |(bv, bp), (v, p)| {
    if v < bv {
    (v, p)
    } else {
    (bv, bp)
    }
    });

    return best_params;

    fn search<
    A: 'static + Send + Sync,
    T: 'static + Copy + Send + Sync + Default + SampleUniform + PartialOrd,
    const N: usize,
    >(
    // Generics
    ranges: Arc<[Range<T>; N]>,
    f: fn(&[T; N], Option<Arc<A>>) -> f64,
    evaluation_data: Option<Arc<A>>,
    counter: Arc<AtomicU64>,
    best: Arc<Mutex<f64>>,
    thread_exit: Arc<AtomicBool>,
    thread_execution_position: Arc<AtomicU8>,
    thread_execution_times: Arc<[Mutex<(Duration, u64)>; 4]>,
    // Specifics
    iterations: u64,
    ) -> (f64, [T; N]) {
    let mut execution_position_timer = Instant::now();
    let mut rng = thread_rng();
    let mut params = [Default::default(); N];

    let mut best_value = f64::MAX;
    let mut best_params = [Default::default(); N];
    for _ in 0..iterations {
    // Gen random values
    for (range, param) in ranges.iter().zip(params.iter_mut()) {
    *param = rng.gen_range(range.clone());
    }

    // Update execution position
    execution_position_timer = update_execution_position(
    1,
    execution_position_timer,
    &thread_execution_position,
    &thread_execution_times,
    );

    // Run function
    let new_value = f(&params, evaluation_data.clone());

    // Update execution position
    execution_position_timer = update_execution_position(
    2,
    execution_position_timer,
    &thread_execution_position,
    &thread_execution_times,
    );

    // Check best
    if new_value < best_value {
    best_value = new_value;
    best_params = params;
    *best.lock().unwrap() = best_value;
    }

    // Update execution position
    execution_position_timer = update_execution_position(
    3,
    execution_position_timer,
    &thread_execution_position,
    &thread_execution_times,
    );

    counter.fetch_add(1, Ordering::SeqCst);

    // Update execution position
    execution_position_timer = update_execution_position(
    4,
    execution_position_timer,
    &thread_execution_position,
    &thread_execution_times,
    );

    if thread_exit.load(Ordering::SeqCst) {
    break;
    }
    }
    // Update execution position
    // 0 represents ended state
    thread_execution_position.store(0, Ordering::SeqCst);
    return (best_value, best_params);
    }
    }
    util.rs :
    pub fn update_execution_position<const N: usize>(
    i: usize,
    execution_position_timer: Instant,
    thread_execution_position: &Arc<AtomicU8>,
    thread_execution_times: &Arc<[Mutex<(Duration, u64)>; N]>,
    ) -> Instant {
    {
    let mut data = thread_execution_times[i - 1].lock().unwrap();
    data.0 += execution_position_timer.elapsed();
    data.1 += 1;
    }
    thread_execution_position.store(i as u8, Ordering::SeqCst);
    Instant::now()
    }

    pub struct Polling {
    pub poll_rate: u64,
    pub printing: bool,
    pub early_exit_minimum: Option<f64>,
    pub thread_execution_reporting: bool,
    }
    impl Polling {
    const DEFAULT_POLL_RATE: u64 = 10;
    pub fn new(printing: bool, early_exit_minimum: Option<f64>) -> Self {
    Self {
    poll_rate: Polling::DEFAULT_POLL_RATE,
    printing,
    early_exit_minimum,
    thread_execution_reporting: false,
    }
    }
    }

    pub fn poll<const N: usize>(
    data: Polling,
    // Current count of each thread.
    counters: Vec<Arc<AtomicU64>>,
    offset: u64,
    // Final total iterations.
    iterations: u64,
    // Best values of each thread.
    thread_bests: Vec<Arc<Mutex<f64>>>,
    // Early exit switch.
    thread_exit: Arc<AtomicBool>,
    // Current positions of execution of each thread.
    thread_execution_positions: Vec<Arc<AtomicU8>>,
    // Current average times between execution positions for each thread
    thread_execution_times: Vec<Arc<[Mutex<(Duration, u64)>; N]>>,
    ) {
    let start = Instant::now();
    let mut stdout = stdout();
    let mut count = offset
    + counters
    .iter()
    .map(|c| c.load(Ordering::SeqCst))
    .sum::<u64>();

    if data.printing {
    println!("{:20}", iterations);
    }

    let mut poll_time = Instant::now();
    let mut held_best: f64 = f64::MAX;

    let mut held_average_execution_times: [(Duration, u64); N] =
    vec![(Duration::new(0, 0), 0); N].try_into().unwrap();
    let mut held_recent_execution_times: [Duration; N] =
    vec![Duration::new(0, 0); N].try_into().unwrap();
    while count < iterations {
    if data.printing {
    // loop {
    let percent = count as f32 / iterations as f32;

    // If count == 0, give 00... for remaining time as placeholder
    let remaining_time_estimate = if count == 0 {
    Duration::new(0, 0)
    } else {
    start.elapsed().div_f32(percent)
    };
    print!(
    "\r{:20} ({:.2}%) {} / {} [{}] {}\t",
    count,
    100. * percent,
    print_duration(start.elapsed(), 0..3),
    print_duration(remaining_time_estimate, 0..3),
    if held_best == f64::MAX {
    String::from("?")
    } else {
    format!("{}", held_best)
    },
    if data.thread_execution_reporting {
    let (average_execution_times, recent_execution_times): (
    Vec<String>,
    Vec<String>,
    ) = (0..thread_execution_times[0].len())
    .map(|i| {
    let (mut sum, mut num) = (Duration::new(0, 0), 0);
    for n in 0..thread_execution_times.len() {
    {
    let mut data = thread_execution_times[n][i].lock().unwrap();
    sum += data.0;
    held_average_execution_times[i].0 += data.0;
    num += data.1;
    held_average_execution_times[i].1 += data.1;
    *data = (Duration::new(0, 0), 0);
    }
    }
    if num > 0 {
    held_recent_execution_times[i] = sum.div_f64(num as f64);
    }
    (
    if held_average_execution_times[i].1 > 0 {
    format!(
    "{:.1?}",
    held_average_execution_times[i]
    .0
    .div_f64(held_average_execution_times[i].1 as f64)
    )
    } else {
    String::from("?")
    },
    if held_recent_execution_times[i] > Duration::new(0, 0) {
    format!("{:.1?}", held_recent_execution_times[i])
    } else {
    String::from("?")
    },
    )
    })
    .unzip();

    let execution_positions: Vec<u8> = thread_execution_positions
    .iter()
    .map(|pos| pos.load(Ordering::SeqCst))
    .collect();
    format!(
    "{{ [{}] [{}] {:.?} }}",
    recent_execution_times.join(", "),
    average_execution_times.join(", "),
    execution_positions
    )
    } else {
    String::from("")
    }
    );
    stdout.flush().unwrap();
    }

    // Updates best and does early exiting
    match (data.early_exit_minimum, data.printing) {
    (Some(early_exit), true) => {
    for thread_best in thread_bests.iter() {
    let thread_best_temp = *thread_best.lock().unwrap();
    if thread_best_temp < held_best {
    held_best = thread_best_temp;
    if thread_best_temp <= early_exit {
    thread_exit.store(true, Ordering::SeqCst);
    println!();
    return;
    }
    }
    }
    }
    (None, true) => {
    for thread_best in thread_bests.iter() {
    let thread_best_temp = *thread_best.lock().unwrap();
    if thread_best_temp < held_best {
    held_best = thread_best_temp;
    }
    }
    }
    (Some(early_exit), false) => {
    for thread_best in thread_bests.iter() {
    if *thread_best.lock().unwrap() <= early_exit {
    thread_exit.store(true, Ordering::SeqCst);
    return;
    }
    }
    }
    (None, false) => {}
    }

    thread::sleep(saturating_sub(
    Duration::from_millis(data.poll_rate),
    poll_time.elapsed(),
    ));
    poll_time = Instant::now();

    count = offset
    + counters
    .iter()
    .map(|c| c.load(Ordering::SeqCst))
    .sum::<u64>();
    }

    if data.printing {
    println!(
    "\r{:20} (100.00%) {} / {} [{}] {}\t",
    count,
    print_duration(start.elapsed(), 0..3),
    print_duration(start.elapsed(), 0..3),
    held_best,
    if data.thread_execution_reporting {
    let (average_execution_times, recent_execution_times): (Vec<String>, Vec<String>) =
    (0..thread_execution_times[0].len())
    .map(|i| {
    let (mut sum, mut num) = (Duration::new(0, 0), 0);
    for n in 0..thread_execution_times.len() {
    {
    let mut data = thread_execution_times[n][i].lock().unwrap();
    sum += data.0;
    held_average_execution_times[i].0 += data.0;
    num += data.1;
    held_average_execution_times[i].1 += data.1;
    *data = (Duration::new(0, 0), 0);
    }
    }
    if num > 0 {
    held_recent_execution_times[i] = sum.div_f64(num as f64);
    }
    (
    if held_average_execution_times[i].1 > 0 {
    format!(
    "{:.1?}",
    held_average_execution_times[i]
    .0
    .div_f64(held_average_execution_times[i].1 as f64)
    )
    } else {
    String::from("?")
    },
    if held_recent_execution_times[i] > Duration::new(0, 0) {
    format!("{:.1?}", held_recent_execution_times[i])
    } else {
    String::from("?")
    },
    )
    })
    .unzip();

    let execution_positions: Vec<u8> = thread_execution_positions
    .iter()
    .map(|pos| pos.load(Ordering::SeqCst))
    .collect();
    format!(
    "{{ [{}] [{}] {:.?} }}",
    recent_execution_times.join(", "),
    average_execution_times.join(", "),
    execution_positions
    )
    } else {
    String::from("")
    }
    );
    stdout.flush().unwrap();
    }
    }
    // Since `Duration::saturating_sub` is unstable this is an alternative.
    fn saturating_sub(a: Duration, b: Duration) -> Duration {
    if let Some(dur) = a.checked_sub(b) {
    dur
    } else {
    Duration::new(0, 0)
    }
    }
    main.rs
    use std::{cmp,sync::Arc};

    type Image = Vec<Vec<Pixel>>;
    #[derive(Clone)]
    pub struct Pixel {
    pub luma: u8,
    }
    impl From<&u8> for Pixel {
    fn from(x: &u8) -> Pixel {
    Pixel { luma: *x }
    }
    }

    fn main() {
    // Setup
    // -------------------------------------------

    fn open_image(path: &str) -> Image {
    let example = image::open(path).unwrap().to_rgb8();
    let dims = example.dimensions();
    let size = (dims.0 as usize, dims.1 as usize);
    let example_vec = example.into_raw();

    // Binarizes image
    let img_vec = from_raw(&example_vec, size);
    img_vec
    }

    println!("Started ...");
    let example: Image = open_image("example.jpg");
    let target: Image = open_image("target.jpg");
    // let first_image = Some(Arc::new((examples[0].clone(), targets[0].clone())));
    println!("Opened...");
    let image = Some(Arc::new((example, target)));

    // Running the optimization
    // -------------------------------------------
    println!("Started opt...");
    let best = simple_optimization::random_search(
    [0..255, 0..255, 0..255, 1..255, 1..255],
    eval_one,
    image,
    Some(simple_optimization::Polling {
    poll_rate: 100,
    printing: true,
    early_exit_minimum: None,
    thread_execution_reporting: true,
    }),
    2300,
    );

    println!("{:.?}", best); // [34, 220, 43, 253, 168]
    assert!(false);

    fn eval_one(arr: &[u8; 5], opt: Option<Arc<(Image, Image)>>) -> f64 {
    let bin_params = (
    arr[0] as u8,
    arr[1] as u8,
    arr[2] as u8,
    arr[3] as usize,
    arr[4] as usize,
    );
    let arc = opt.unwrap();
    // Gets average mean-squared-error
    let binary_pixels = binarize_buffer(arc.0.clone(), bin_params);
    mse(binary_pixels, &arc.1)
    }

    // Mean-squared-error
    fn mse(prediction: Image, target: &Image) -> f64 {
    let n = target.len() * target[0].len();
    prediction
    .iter()
    .flatten()
    .zip(target.iter().flatten())
    .map(|(p, t)| difference(p, t).powf(2.))
    .sum::<f64>()
    / (2. * n as f64)
    }
    #[rustfmt::skip]
    fn difference(p: &Pixel, t: &Pixel) -> f64 {
    p.luma as f64 - t.luma as f64
    }
    }

    pub fn from_raw(raw: &[u8], (_i_size, j_size): (usize, usize)) -> Vec<Vec<Pixel>> {
    (0..raw.len())
    .step_by(j_size)
    .map(|index| {
    raw[index..index + j_size]
    .iter()
    .map(Pixel::from)
    .collect::<Vec<Pixel>>()
    })
    .collect()
    }

    pub fn binarize_buffer(
    mut img: Vec<Vec<Pixel>>,
    (_, _, local_luma_boundary, local_field_reach, local_field_size): (u8, u8, u8, usize, usize),
    ) -> Vec<Vec<Pixel>> {
    let (i_size, j_size) = (img.len(), img[0].len());
    let i_chunks = (i_size as f32 / local_field_size as f32).ceil() as usize;
    let j_chunks = (j_size as f32 / local_field_size as f32).ceil() as usize;

    let mut local_luma: Vec<Vec<u8>> = vec![vec![u8::default(); j_chunks]; i_chunks];

    // Gets average luma in local fields
    // O((s+r)^2*(n/s)*(m/s)) : s = local field size, r = local field reach
    for (i_chunk, i) in (0..i_size).step_by(local_field_size).enumerate() {
    let i_range = zero_checked_sub(i, local_field_reach)
    ..cmp::min(i + local_field_size + local_field_reach, i_size);
    let i_range_length = i_range.end - i_range.start;
    for (j_chunk, j) in (0..j_size).step_by(local_field_size).enumerate() {
    let j_range = zero_checked_sub(j, local_field_reach)
    ..cmp::min(j + local_field_size + local_field_reach, j_size);
    let j_range_length = j_range.end - j_range.start;

    let total: u32 = i_range
    .clone()
    .map(|i_range_indx| {
    img[i_range_indx][j_range.clone()]
    .iter()
    .map(|p| p.luma as u32)
    .sum::<u32>()
    })
    .sum();

    local_luma[i_chunk][j_chunk] = (total / (i_range_length * j_range_length) as u32) as u8;
    }
    }

    // Apply binarization
    // O(nm)
    for i in 0..i_size {
    let i_group: usize = i / local_field_size; // == floor(i as f32 / local_field_size as f32) as usize
    for j in 0..j_size {
    let j_group: usize = j / local_field_size;

    // Local average boundaries
    // --------------------------------
    if let Some(local) = local_luma[i_group][j_group].checked_sub(local_luma_boundary) {
    if img[i][j].luma < local {
    img[i][j].luma = 0;
    continue;
    }
    }
    if let Some(local) = local_luma[i_group][j_group].checked_add(local_luma_boundary) {
    if img[i][j].luma > local {
    img[i][j].luma = 255;
    continue;
    }
    }
    // White is the negative (false/0) colour in our binarization, thus this is our else case
    img[i][j].luma = 255;
    }
    }
    img
    }

    #[rustfmt::skip]
    fn zero_checked_sub(a: usize, b: usize) -> usize { if a > b { a - b } else { 0 } }

    Project zip (以防您不想花时间进行设置)。
    否则,这里是用作 /target.jpg 的图像和 /example.jpg (具体是这些图像无关紧要,任何图像都应该有效):
    enter image description here
    enter image description here
    Cargo.toml依赖:
    [dependencies]
    rand = "0.8.4"
    itertools = "0.10.1" # izip!
    num_cpus = "1.13.0" # Multi-threading
    print_duration = "1.0.0" # Printing progress
    num = "0.4.0" # Generics
    rand_distr = "0.4.1" # Normal distribution
    image = "0.23.14"
    serde = { version="1.0.118", features=["derive"] }
    serde_json = "1.0.50"

    我确实觉得很不愿意发布这么大的问题
    不可避免地需要人们阅读几百行(特别是考虑到该项目在操场上不起作用),但我真的迷失在这里,看不到其他方式来传达问题的整个领域。对此深表歉意。
    如前所述,我已经尝试了一段时间来弄清楚这里发生了什么,但我没有得到任何帮助,我们将不胜感激。

    最佳答案

    一些基本的调试(又名 println! 无处不在)表明您的性能问题根本与多线程无关。它只是随机发生,当有 24 个线程在做它们的工作时,一个随机停止的事实并不明显,但是当只剩下一两个线程时,它们会显得很慢。
    但是这个性能瓶颈在哪里呢?好吧,您是在代码中自己说明的:在 binary_buffer 中你说:

        // Gets average luma in local fields
    // O((s+r)^2*(n/s)*(m/s)) : s = local field size, r = local field reach
    s 的值和 r似乎是 0 之间的随机值和 255 , 而 n是图像行的长度,以字节为单位 3984 * 3 = 11952 , 和 m是行数 2271 .
    现在,大多数时候 O()大约几百万,相当可控。但如果 s恰好是小而 r大,如 (3, 200)那么计算次数会超过 1e11 !
    幸运的是,我认为您可以在对 random_search 的原始调用中定义这些值的范围。所以在那里进行一些调整应该会让你回到合理的复杂性。将范围更改为:
    [0..255, 0..255, 0..255, 1..255, 20..255],
    // ^ here
    似乎对我有用。
    PS: binary_buffer开头的这几行是发现这一点的关键:
        let o = (i_size / local_field_size) * (j_size / local_field_size) * (local_field_size + local_field_reach).pow(2);
    println!("\nO() = {}", o);

    关于multithreading - 多线程 : what am I missing? 随着时间的推移性能严重下降,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68554009/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com