gpt4 book ai didi

c++ - 提高c++中矩阵(多维数组)的OpenMP多线程并行计算效率

转载 作者:行者123 更新时间:2023-11-28 05:25:13 26 4
gpt4 key购买 nike

我刚开始使用 OpenMP 在 C++ 中进行并行计算。该程序的并行性能很差。由于我不知道很多多线程分析工具(与用于单线程的简单 gprof 不同),我编写了一个示例程序来测试性能。

我有一个 2D 矩阵 (N * N),每个元素都是一个 3d vector (x, y, z)。我只是做了一个双循环来设置矩阵中的每个值:

for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
vectorStack[i][j] = VECTOR3D(1.0*i*i, 1.0*j*j, 1.0*i*j);
}
}

VECTOR3D 是一个简单的类,具有 x, y, z 属性:

class VECTOR3D {
double x, y, z; // component along each axis
}

另一方面,我也可以使用 (N * N * 3) 3D 数组来执行此操作:

for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
arrayHeap[i][j][0] = (1.0*i*i);
arrayHeap[i][j][1] = (1.0*j*j);
arrayHeap[i][j][2] = (1.0*i*j);
}
}

从内存方面来说,也有几种不同的选择,比如使用原始指针,手动分配和释放:

  double ***arrayHeap;
arrayHeap = new double** [N];
for(int i = 0; i < N; ++i) {
arrayHeap[i] = new double* [N];
for(int j = 0; j < N; ++j) {
arrayHeap[i][j] = new double[3];
}
}

或者简单地使用std::vector:

  vector< vector<VECTOR3D>> vectorStack(N, vector<VECTOR3D>(N, VECTOR3D(0, 0, 0)));

我还考虑过手动为数组分配连续内存,就像在 LAMMP(Molecular Simulation Package 中所做的那样。源代码。

所以我的 N=10000 结果列在这里:

对于单线程:

OMP_NUM_THREADS=1 ./a.out

在堆上为数组分配内存...

=======堆上的数组结果=======

完成时间(总计):0.720385

完成时间(实际):0.720463

为堆上的数组释放内存...

为数组连续分配内存...

=======数组连续结果=======

完成时间(总计):0.819733

完成时间(真实):0.819774

为数组连续释放内存...

在堆上为 vector 分配内存...

======= Vector on heap 结果=======

完成时间(总计):3.08715

完成时间(真实):3.08725

为堆上的 vector 解除分配内存...

为堆栈上的 vector 分配内存...

======= Vector on stack 结果=======

完成时间(总计):1.49888

完成时间(真实):1.49895

对于多线程(threads=4):

OMP_NUM_THREADS=4 ./a.out

在堆上为数组分配内存...

=======堆上的数组结果=======

完成时间(总计):2.29184

完成时间(真实):0.577807

为堆上的数组释放内存...

为数组连续分配内存...

=======数组连续结果=======

完成时间(总计):1.79501

完成时间(真实):0.454139

为数组连续释放内存...

在堆上为 vector 分配内存...

======= Vector on heap 结果=======

完成时间(总计):6.80917

完成时间(真实):1.92541

为堆上的 vector 解除分配内存...

为堆栈上的 vector 分配内存...

======= Vector on stack 结果=======

完成时间(总计):1.64086

完成时间(真实):0.411

整体并行效率不高。出乎意料,花哨的连续内存分配没有帮助?!为什么会这样?对于这种情况,std::vector 似乎足够好?

有人可以为我解释一下结果吗?我还需要有关如何改进代码的建议。

非常感谢!!!

附上全部源码。(请直接上main,开头有几个手动管理内存的函数)。

#include <iostream>
#include <omp.h>
#include <vector>
#include <stdlib.h>
#include <cinttypes>
#include "vector3d.h"

typedef int64_t bigint;

void *smalloc(bigint nbytes, const char *name)
{
if (nbytes == 0) return NULL;
void *ptr = malloc(nbytes);
return ptr;
}

template <typename TYPE>
TYPE ***create(TYPE ***&array, int n1, int n2, int n3, const char *name)
{
bigint nbytes = ((bigint) sizeof(TYPE)) * n1*n2*n3;
TYPE *data = (TYPE *) smalloc(nbytes,name);
nbytes = ((bigint) sizeof(TYPE *)) * n1*n2;
TYPE **plane = (TYPE **) smalloc(nbytes,name);
nbytes = ((bigint) sizeof(TYPE **)) * n1;
array = (TYPE ***) smalloc(nbytes,name);

int i,j;
bigint m;
bigint n = 0;
for (i = 0; i < n1; i++) {
m = ((bigint) i) * n2;
array[i] = &plane[m];
for (j = 0; j < n2; j++) {
plane[m+j] = &data[n];
n += n3;
}
}
return array;
}

template <typename TYPE>
TYPE ***create3d_offset(TYPE ***&array, int n1lo, int n1hi,
int n2, int n3, const char *name)
{
int n1 = n1hi - n1lo + 1;
create(array,n1,n2,n3,name);
array -= n1lo;
return array;
}

void sfree(void *ptr) {
if (ptr == NULL) return;
free(ptr);
}


template <typename TYPE>
void destroy(TYPE ***&array)
{
if (array == NULL) return;
sfree(array[0][0]);
sfree(array[0]);
sfree(array);
array = NULL;
}

template <typename TYPE>
void destroy3d_offset(TYPE ***&array, int offset)
{
if (array == NULL) return;
sfree(&array[offset][0][0]);
sfree(&array[offset][0]);
sfree(&array[offset]);
array = NULL;
}


////////////////////////////////////////////////////////
////////////////////////////////////////////////////////
////////////////////////////////////////////////////////

int main() {
using namespace std;

const int N = 10000;


///////////////////////////////////////

double sum = 0.0;
clock_t t;
double startTime, stopTime, secsElapsed;


printf("\n\nAllocating memory for array on heap...\n");
double ***arrayHeap;
arrayHeap = new double** [N];
for(int i = 0; i < N; ++i) {
arrayHeap[i] = new double* [N];
for(int j = 0; j < N; ++j) {
arrayHeap[i][j] = new double[3];
}
}


printf("======= Array on heap Results =======\n");

sum = 0.0;
t = clock();
startTime = omp_get_wtime();

#pragma omp parallel
{
//#pragma omp for schedule(dynamic)
//#pragma omp for collapse(2)
#pragma omp for
for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
arrayHeap[i][j][0] = (1.0*i*i);
arrayHeap[i][j][1] = (1.0*j*j);
arrayHeap[i][j][2] = (1.0*i*j);
}
}

}

t = clock() - t;
cout << "Finished within time (total): " << ((double) t) / CLOCKS_PER_SEC << " seconds" << endl;
stopTime = omp_get_wtime();
secsElapsed = stopTime - startTime;
cout << "Finished within time (real): " << secsElapsed << " seconds" << endl;

printf("Deallocating memory for array on heap...\n");
for(int i = 0; i < N; ++i) {
for(int j = 0; j < N; ++j) {
delete [] arrayHeap[i][j];
}
delete [] arrayHeap[i];
}
delete [] arrayHeap;


///////////////////////////////////////


printf("\n\nAllocating memory for array continous...\n");
double ***array_continuous;
create3d_offset(array_continuous,0, N, N, 3, "array");

printf("======= Array continuous Results =======\n");

sum = 0.0;
t = clock();
startTime = omp_get_wtime();

#pragma omp parallel
{
//#pragma omp for schedule(dynamic)
//#pragma omp for collapse(2)
#pragma omp for
for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
array_continuous[i][j][0] = (1.0*i*i);
array_continuous[i][j][1] = (1.0*j*j);
array_continuous[i][j][2] = (1.0*i*j);
}
}

}

t = clock() - t;
cout << "Finished within time (total): " << ((double) t) / CLOCKS_PER_SEC << " seconds" << endl;
stopTime = omp_get_wtime();
secsElapsed = stopTime - startTime;
cout << "Finished within time (real): " << secsElapsed << " seconds" << endl;


printf("Deallocating memory for array continuous...\n");
destroy3d_offset(array_continuous, 0);



///////////////////////////////////////k



printf("\n\nAllocating memory for vector on heap...\n");
VECTOR3D ***vectorHeap;
vectorHeap = new VECTOR3D**[N];
for(int i = 0; i < N; ++i) {
vectorHeap[i] = new VECTOR3D* [N];
for(int j = 0; j < N; ++j) {
vectorHeap[i][j] = new VECTOR3D(0,0,0);
}
}

printf("======= Vector on heap Results =======\n");
sum = 0.0;
t = clock();
startTime = omp_get_wtime();

#pragma omp parallel
{
//#pragma omp for schedule(dynamic)
//#pragma omp for collapse(2)
#pragma omp for
for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
vectorHeap[i][j] = new VECTOR3D(1.0*i*i, 1.0*j*j, 1.0*i*j);
}
}

}

t = clock() - t;
cout << "Finished within time (total): " << ((double) t) / CLOCKS_PER_SEC << " seconds" << endl;
stopTime = omp_get_wtime();
secsElapsed = stopTime - startTime;
cout << "Finished within time (real): " << secsElapsed << " seconds" << endl;

printf("Deallocating memory for vector on heap...\n");
for(int i = 0; i < N; ++i) {
for(int j = 0; j < N; ++j) {
delete [] vectorHeap[i][j];
}
delete [] vectorHeap[i];
}
delete [] vectorHeap;


/////////////////////////////////////////////////

printf("\n\nAllocating memory for vector on stack...\n");
vector< vector<VECTOR3D>> vectorStack(N, vector<VECTOR3D>(N, VECTOR3D(0, 0, 0)));

printf("======= Vector on stack Results =======\n");
sum = 0.0;
t = clock();
startTime = omp_get_wtime();

#pragma omp parallel
{
//#pragma omp for schedule(dynamic)
//#pragma omp for collapse(2)
#pragma omp for
for (int i = 0; i < N; ++i) {
for (int j = 0; j < N; ++j) {
vectorStack[i][j] = VECTOR3D(1.0*i*i, 1.0*j*j, 1.0*i*j);
}
}

}

t = clock() - t;
cout << "Finished within time (total): " << ((double) t) / CLOCKS_PER_SEC << " seconds" << endl;
stopTime = omp_get_wtime();
secsElapsed = stopTime - startTime;
cout << "Finished within time (real): " << secsElapsed << " seconds" << endl;


/////////////////////////////////



return 0;
}

还有 VECTOR3D 类:

#ifndef _VECTOR3D_H
#define _VECTOR3D_H

#include <iostream>
#include <cmath>
#include <iomanip>
#include <limits>

class VECTOR3D {
public:
double x, y, z; // component along each axis (cartesian)
VECTOR3D(double xx = 0.0, double yy = 0.0, double zz = 0.0) : x(xx), y(yy), z(zz) // make a 3d vector
{
}
}

最佳答案

一般误解

您的简单循环不受计算限制,但完全受内存限制:您只访问每个元素一次。不重用意味着您无法有效地使用缓存。因此,您不能期望加速等于使用的线程/核心数。实际加速取决于具体系统(内存带宽)。

间接

您的所有数据结构,包括奇特的连续内存,都对数据访问执行许多间接操作。这不是绝对必要的。要充分利用连续内存,您应该简单地布置二维数组:

template<class T>
class Array2d
{
public:
Array2d(size_t rows, size_t columns) : rows_(rows), columns_(columns), data_(rows_ * columns_) {}
T& operator()(size_t r, size_t c)
{
return data_[r * columns_ + c];
}

const T& operator()(size_t r, size_t c) const
{
return data_[r * columns_ + c];
}

private:
size_t rows_;
size_t columns_;
std::vector<T> data_;
};

注意:如果你真的必须保留 [i] [j] 索引。

澄清

如果您受到内存带宽的限制并且 N 足够大,则间接或平面布局之间不会有明显的性能差异。

关于c++ - 提高c++中矩阵(多维数组)的OpenMP多线程并行计算效率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40700927/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com