gpt4 book ai didi

multithreading - 为什么线程越多这个程序越慢?

转载 作者:行者123 更新时间:2023-11-29 08:34:16 25 4
gpt4 key购买 nike

与仅使用一个线程相比,我的程序使用两个线程运行时的执行时间是其两倍。

我创建了 a minimal example program with the same problem使用 scoped-pool :

#![feature(test)]

extern crate scoped_pool;
extern crate test;

use scoped_pool::Pool;
use test::Bencher;

/// This is a minimized program exhibiting a performance problem
/// Why is this program twice as fast, when the number of threads is set to 1 instead of 2?
#[bench]
pub fn test_bench_alt(b: &mut Bencher) {
let parallellism = 1;
let data_size = 500_000;

let mut pool = Pool::new(parallellism);

{
let mut data = Vec::new();
for _ in 0..data_size {
data.push(0);
}

let mut output_data = Vec::<Vec<i32>>::new();
for _ in 0..parallellism {
let mut t = Vec::<i32>::with_capacity(data_size / parallellism);
output_data.push(t);
}
b.iter(move || {
for i in 0..parallellism {
output_data[i].clear();
}
{
let mut output_data_ref = &mut output_data;
let data_ref = &data;
pool.scoped(move |scope| {
for (idx, output_data_bucket) in output_data_ref.iter_mut().enumerate() {
scope.execute(move || {
for item in &data_ref[(idx * (data_size / parallellism))
..((idx + 1) * (data_size / parallellism))]
{
//Yes, this is a logic bug when parallellism does not evenely divide data_size. I could use "chunks" to avoid this, but I wanted to keep this simple for this analysis.
output_data_bucket.push(*item);
}
});
}
});
}
let mut output_data_ref = &mut output_data;
pool.scoped(move |scope| {
for sub in output_data_ref.iter_mut() {
scope.execute(move || {
for sublot in sub {
assert!(*sublot != 42);
}
});
}
});
});
}
}

fn main() {}

这是一个程序,它接受一个输入向量,在每个线程中处理该向量的一部分,将每个线程的输出聚合到一个向量中,然后处理结果向量。实际程序更复杂,但这个最小化的程序仍然存在性能问题,即使它没有做任何有值(value)的事情。

运行长凳:

用一个线程:

test test_bench_alt ... bench:     781,105 ns/iter (+/- 1,103)

有两个线程:

test test_bench_alt ... bench:   1,537,465 ns/iter (+/- 154,499)

为什么用两个线程运行时程序会变慢?可以做些什么来让它更快?

更新:

以下高度优化的 C++ 程序执行大致相同的工作,并且(在我的机器上)最多可扩展到 19 个线程,证明工作负载实际上可以并行化。

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <vector>
#include <chrono>
#include <sched.h>
#include <atomic>



#define PAR 1
#define DATASIZE 524288

std::vector<std::vector<int>> output;
std::vector<int> input;


int run_job1(int task) {

int l = DATASIZE/PAR;
int off = task*(DATASIZE/PAR);
auto temp = &output[task][0];
auto ip = &input[off];
for(int i=0;i<l;++i){
*temp=*ip;//+off;
temp+=1;
ip+=1;
}
return 0;
}


int run_job2(int task) {
auto& temp = output[task];
auto temp_p = &output[task][0];
auto temp_p2 = temp_p + DATASIZE/PAR;
int expected = task*(DATASIZE/PAR);
while(temp_p!=temp_p2) {
if (*temp_p!=expected)
printf("Woha!\n");
temp_p+=1;
expected+=1;
}
return 0;
}

std::atomic_int valsync=0;
std::atomic_int valdone=0;

void* threadfunc(void* p) {
int i = (int)(long)p;
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(i, &set);
sched_setaffinity(0, sizeof(set),&set);

int expect=1;
while(true) {
while(valsync.load()!=expect) {
}
expect+=1;
run_job1(i);
valdone+=1;

while(valsync.load()!=expect) {
}
expect+=1;
run_job2(i);
valdone+=1;
}

}

int main() {

for(int i=0;i<DATASIZE;++i) {
input.push_back(i);
}
for(int i=0;i<PAR;++i) {
std::vector<int> t;
for(int j=0;j<DATASIZE/PAR;++j)
t.push_back(0);
output.push_back(t);
}
for (int i = 0; i < PAR ; ++i)
{
pthread_t thread_id;
if(pthread_create(&thread_id, NULL, threadfunc, (void*)i)) {

fprintf(stderr, "Error creating thread\n");
return 1;

}
}
for(int run=0;run<20;++run)
{
std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now();
for(int j=0;j<1000;++j) {

std::atomic_fetch_add(&valsync,1);
while(true) {
int expected=PAR;
if (std::atomic_compare_exchange_strong(&valdone,&expected,0))
break;

}


std::atomic_fetch_add(&valsync,1);
while(true) {
int expected=PAR;
if (std::atomic_compare_exchange_strong(&valdone,&expected,0))
break;
}
}
std::chrono::steady_clock::time_point t2= std::chrono::steady_clock::now();
auto delta = t2-t1;

std::cout<<"Time: "<<std::chrono::duration_cast<std::chrono::nanoseconds>(delta).count()/1000<<" ns per iter \n";
}

return 0;
}

最佳答案

主要问题是这个基准几乎毫无意义。分配和比较数字不是计算密集型操作,这意味着并行化这些操作几乎没有任何值(value)。正如这些测量结果所示,添加更多线程只会削弱性能。

令人惊讶的是,最大的瓶颈可能在于构建输出向量时出现的其他琐碎指令,而迭代器可以避免这些指令。大多数与向量的交互都依赖索引运算符 [] 来迭代集合,这是非常规且不明智的。这是同一基准的改进版本。这些变化总结如下:

  • 可以使用 vec 宏来初始化具有特定元素的向量:vec![0; data_size].
  • 还可以使用迭代器构建 N 个初始向量。使用 Vec::new 创建的空向量不会分配堆内存,所以这基本上没问题。
  • 在为每个 worker 分配工作时,输入内存块和输出向量可以压缩在一起。两个 block 都会自动进行迭代,所需的边界检查要少得多。此外,由于 chunks,如果最后一个 worker 被分配了一个较小的 slice,它就不会尝试越界访问。
  • 每个线程的工作也可以通过迭代器进行并收集到一个新的向量中,而不是产生一个循环,在每个步骤中将新值推送到现有的可变向量。编译器可以使用这种方法避免许多冗余检查。
  • 最后,基准测试的第二部分不需要对“已处理”内容进行可变访问。
#[bench]
pub fn test_bench_alt(b: &mut Bencher) {
let parallellism = 1;
let data_size = 500_000;

let pool = Pool::new(parallellism);

{
let data = vec![0; data_size];

let mut output_data: Vec<_> = (0..parallellism).map(|_| Vec::new()).collect();

b.iter(move || {
for vec in &mut output_data {
vec.clear();
}

{
let data_ref = &data;
pool.scoped(|scope| {
for (output_data_bucket, input_data_chunk) in (&mut output_data)
.into_iter()
.zip(data_ref.chunks(data_size / parallellism))
{
scope.execute(move || {
*output_data_bucket = input_data_chunk.into_iter().cloned().collect();
})
}
});
}
pool.scoped(|scope| {
for sub in &output_data {
scope.execute(move || {
for sublot in sub {
assert_ne!(*sublot, 42);
}
});
}
});
});
}
}

之前:

test test_bench_alt ... bench:   1,352,071 ns/iter (+/- 516,762)

之后:

test test_bench_alt ... bench:     533,573 ns/iter (+/- 213,486)

这些数字可能只在线程数增加的情况下稍微好一点,尽管方差更大。对于并行度 = 2:

test test_bench_alt ... bench:     314,662 ns/iter (+/- 340,636)

如果您将计算密集型算法引入等式,那么您可以考虑这些想法再试一次。

关于multithreading - 为什么线程越多这个程序越慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51515991/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com