gpt4 book ai didi

c++ - 如何在MPI-2+中复制MPI_Accumulate的功能

转载 作者:太空狗 更新时间:2023-10-29 23:03:46 26 4
gpt4 key购买 nike

我正在学习 MPI-2/MPI-3 中引入的 MPI 单向通信,并遇到了这个 online course page关于MPI_Accumulate:

MPI_Accumulate allows the caller to combine the data moved to the target process with data already present, such as accumulation of a sum at a target process. The same functionality could be achieved by using MPI_Get to retrieve data (followed by synchronization); performing the sum operation at the caller; then using MPI_Put to send the updated data back to the target process. Accumulate simplifies this messiness ...

MPI_Accumulate 允许使用的操作数量有限(max、min、sum、product 等),不允许用户自定义操作。我想知道如何使用 MPI_Get、sync、op 和 MPI_Put 实现上述困惑。是否有任何 C/C++ 教程或工作代码示例?

谢谢


为了测试,我从SO question改编了一段代码,其中单方通信用于创建一个整数计数器,该计数器在 MPI 进程之间保持同步。使用 MPI_Accumulate 的目标问题行已被标记。

代码按原样编译并在大约 15 秒后返回。但是,当我尝试将 MPI_Accumulate 替换为基本操作的等效序列(如问题行后面的注释 block 中所示)时,已编译的程序无限期挂起。

谁能帮忙解释一下哪里出了问题,以及在此上下文中替换 MPI_Accumulate 的正确方法是什么?

附言我用

编译了代码
g++ -std=c++11 -I..   mpistest.cpp -lmpi

并执行二进制文件

mpiexec -n 4 a.exe

代码:

//adpated from https://stackoverflow.com/questions/4948788/
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <thread>
#include <chrono>

struct mpi_counter_t {
MPI_Win win;
int hostrank; //id of the process that host values to be exposed to all processes
int rank; //process id
int size; //number of processes
int val;
int *hostvals;
};

struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;

count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->hostvals));
for (int i=0; i<count->size; i++) count->hostvals[i] = 0;
MPI_Win_create(count->hostvals, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
else {
count->hostvals = NULL;
MPI_Win_create(count->hostvals, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> val = 0;

return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;

MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

for (int i=0; i<count->size; i++) {

if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
/* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? Currently, the following causes the program to hang.
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}

MPI_Win_unlock(0, count->win);

//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;

//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];

free(vals);
return val;
}

void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->hostvals);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;

return;
}

void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->hostvals[i]);
}
puts("");
}
}


int main(int argc, char **argv) {

MPI_Init(&argc, &argv);

const int WORKITEMS=50;

struct mpi_counter_t *c;
int rank;
int result = 0;

c = create_counter(0);

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);

while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
std::this_thread::sleep_for (std::chrono::seconds(rand()%2));
} else {
printf("%d done\n", rank);
}
}

MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);


MPI_Finalize();
return 0;
}

另外一个问题,我应该在这里使用 MPI_Win_fence 而不是锁吗?

--编辑--

我在 increment_counter 中使用了 lock/unlock 如下,程序运行但行为异常。在最终的打印输出中,主节点完成了所有工作。还是一头雾水。

int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;

MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

for (int i=0; i<count->size; i++) {

if (i == count->rank) {
//MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
///* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? reports that 0 does all the work
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_unlock(0, count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}

MPI_Win_unlock(0, count->win);

//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;

//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];

free(vals);
return val;
}

最佳答案

使用 Gets 和 Puts 实现 Accumulate 确实会非常困惑,尤其是当您必须处理派生数据类型等时。但是假设您正在对单个整数进行累加,并且只想将本地值加总到远程缓冲区中,您可以执行以下操作(仅限伪代码):

MPI_Win_lock(EXCLUSIVE);  /* exclusive needed for accumulate atomicity constraints */
MPI_Get(&remote_data);
MPI_Win_flush(win); /* make sure GET has completed */
new = local_data + remote_data;
MPI_Put(&new);
MPI_Win_unlock();

您的代码不正确,因为您在 GET 之后放弃了独占锁,这会在两个进程试图同时对数据求和时导致原子性问题。

关于c++ - 如何在MPI-2+中复制MPI_Accumulate的功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24709601/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com