gpt4 book ai didi

c - MPI 异步/单边通信

转载 作者:行者123 更新时间:2023-11-30 16:01:25 26 4
gpt4 key购买 nike

我遇到的情况与下面的代码类似:工作进程处理数据子集,并且必须将未知数量的数据发送回主进程。是否可以让主进程等待并接收来自工作进程的未知数量的发送?有没有一种方法可以使用单方面的沟通来做到这一点?提前致谢!

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

/*
sample run/output:
$mpirun -np 5 practice.exe
@[1]: i=30
@[2]: i=0
@[2]: i=75
@[4]: i=40
@[4]: i=55
@[3]: i=85
@[3]: i=65
*/
int main(int argc, char *argv[])
{
int i, rank, size, np, nw, num;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &np);
nw = np -1;

srand(time(NULL)*rank);

if (rank > 0)
{
for (i=(rank-1); i<(nw*10); i+=nw)
{
num = rand() % 100;
if (num % 5 == 0)
{
printf("@[%d]: i=%d\n", rank, num);
// SEND num TO MASTER
}
}
}
else
{
// RECEIVE num FROM WORKER
}

MPI_Finalize();

return EXIT_SUCCESS;
}

最佳答案

当然,有很多方法可以做到这一点,但它实际上与异步通信没有任何关系。您可以通过单向通信来完成此操作,但即使这样也有其自身的问题(您仍然必须能够猜测数据需要多少总内存)。

一种方法是简单地计算出您有多少数据,将其发送给主服务器,以便它知道要接收多少条消息,然后一次发送一个数据:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_NUM_INCOMING 1
#define TAG_DATA 2
int main(int argc, char *argv[])
{
int i, rank, size, np, nw, num;
int mynums[MAXPERWORKER], numcount, total;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &np);
nw = np -1;

srand(time(NULL)*rank);

if (rank > 0)
{
numcount = 0;
total = 0;
for (i=(rank-1); i<(nw*10); i+=nw)
{
num = rand() % 100;
if (num % 3 == 0)
{
printf("@[%d]: i=%d\n", rank, num);
mynums[numcount] = num;
numcount++;
total += num;
}

}
/* of course, in this case we could just
* do this in one message, but..
*/
MPI_Send(&numcount, 1, MPI_INT, 0, TAG_NUM_INCOMING, MPI_COMM_WORLD);
for (i=0; i<numcount; i++)
MPI_Send(&(mynums[i]), 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);

printf("@[%d]: Total of all nums is %d\n", rank, total);
}
else
{
int *totals = malloc(sizeof(int)*nw);
int *counts = malloc(sizeof(int)*nw);
int *sofar = malloc(sizeof(int)*nw);
int **data = malloc(sizeof(int *)*nw);
int rcv;
int totalcounts;
int j;
int workernum;
MPI_Status status;

for (i=0; i<nw; i++) {
sofar[i] = 0;
totals[i]= 0;
}

/* get number of incoming messages */
for (i=0; i<nw; i++) {
MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_NUM_INCOMING, MPI_COMM_WORLD, &status);

workernum = status.MPI_SOURCE-1;
counts[workernum] = rcv;
totalcounts += rcv;
data[workernum] = malloc(sizeof(int)*rcv);
}

/* get real data */
for (i=0; i<totalcounts; i++) {
MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_DATA, MPI_COMM_WORLD, &status);
workernum = status.MPI_SOURCE-1;
data[ workernum ][ sofar[workernum]++ ] = rcv;
totals[ workernum ] += rcv;
}

/* print results */
for (i=0; i<nw; i++) {
printf("From [%2d]:", i+1);
for (j=0; j<counts[i]; j++)
printf("%3d ", data[i][j]);
printf("| %3d\n", totals[i]);
}

for (i=0; i<nw; i++)
free(data[i]);
free(data);
free(totals);
free(counts);
free(sofar);
}

MPI_Finalize();

return EXIT_SUCCESS;
}

在 4 个进程上运行它,我得到:

$ mpirun -np 4 ./masterworker1

@[1]: i=39
@[1]: i=81
@[3]: i=9
@[3]: i=45
@[3]: i=0
@[3]: i=57
@[3]: Total of all nums is 111
@[1]: Total of all nums is 120
From [ 1]: 39 81 | 120
From [ 2]: 24 6 39 | 69
From [ 3]: 9 45 0 57 | 111
@[2]: i=24
@[2]: i=6
@[2]: i=39
@[2]: Total of all nums is 69

但是,这可能不可行 - 您可能不希望像这样缓冲所有数据(如果可以,您可以只在一条消息中发送它)。

另一种方法是发送数据,然后在发送数据完成后发送一条特殊消息,并且主节点不断接收,直到听到每个工作节点发出的“完成”消息之一:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_DATA 2
#define TAG_DONE 1
int main(int argc, char *argv[])
{
int i, rank, size, np, nw, num;
int mynums[MAXPERWORKER], numcount, total;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &np);
nw = np -1;

srand(time(NULL)*rank);

if (rank > 0)
{
numcount = 0;
total = 0;
for (i=(rank-1); i<(nw*10); i+=nw)
{
num = rand() % 100;
if (num % 3 == 0)
{
printf("@[%d]: i=%d\n", rank, num);
total += num;
MPI_Send(&num, 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);
}

}
MPI_Send(&num, 1, MPI_INT, 0, TAG_DONE, MPI_COMM_WORLD);

printf("@[%d]: Total of all nums is %d\n", rank, total);
}
else
{
int *totals = malloc(sizeof(int)*nw);
int *counts = malloc(sizeof(int)*nw);
int **data = malloc(sizeof(int *)*nw);
int rcv;
int j;
int workernum;
int stillsending;
MPI_Status status;

for (i=0; i<nw; i++) {
totals[i]= 0;
counts[i]= 0;
data[i] = malloc(sizeof(int)*MAXPERWORKER);
}
stillsending = nw;

/* get data */
while (stillsending > 0) {
MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

workernum = status.MPI_SOURCE-1;
if (status.MPI_TAG == TAG_DONE) {
stillsending--;
} else if (status.MPI_TAG == TAG_DATA) {
data[workernum][counts[workernum]] = rcv;
totals[workernum] += rcv;
counts[workernum]++;
}
}

/* print results */
for (i=0; i<nw; i++) {
printf("From [%2d]:", i+1);
for (j=0; j<counts[i]; j++)
printf("%3d ", data[i][j]);
printf("| %3d\n", totals[i]);
}

for (i=0; i<nw; i++)
free(data[i]);
free(data);
free(totals);
free(counts);
}

MPI_Finalize();

return EXIT_SUCCESS;
}

再次针对 4 个任务,我得到:

$ mpirun -np 4 ./masterworker2

@[1]: i=63
@[1]: i=99
@[1]: i=60
@[1]: i=69
@[1]: i=21
@[1]: i=48
@[1]: i=24
@[1]: Total of all nums is 384
@[2]: i=39
@[2]: i=84
@[2]: i=63
@[2]: Total of all nums is 186
@[3]: i=3
@[3]: i=51
@[3]: i=36
@[3]: Total of all nums is 90
From [ 1]: 63 99 60 69 21 48 24 | 384
From [ 2]: 39 84 63 | 186
From [ 3]: 3 51 36 | 90

请注意,在这两种情况下,我都依赖于一些 MAXPERWORKER 大小的数组来预分配东西;不过,您实际上并不需要这个,您可以根据需要 malloc 一个数组并重新分配,或者如果您愿意使用 C++,则可以使用 std::vector 。

关于c - MPI 异步/单边通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6612421/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com