gpt4 book ai didi

c - 使用多个进程读取文件并通过pipe()发送数字

转载 作者:太空宇宙 更新时间:2023-11-04 01:25:21 25 4
gpt4 key购买 nike

我必须使用fork(2)来生成用户输入的尽可能多的孩子。
然后我需要他们把工作分割开来,读取一个包含坐标点的txt文件,比较它们之间的距离和输入的距离。
然后,他们将给定距离内的点数相加。每个子项必须将其计数写入管道,父项必须读取每个计数并将其添加到总计中,然后将其打印出来。这是我的代码:

int main( int argc, char *argv[] ) {
int distance = atoi(argv[1]);
if ( argc != 3 || sscanf( argv[ 1 ], "%d", &distance ) != 1 )
fail( "usage: pairs <distance>" );
readPoints();
int workers = atoi(argv[2]);

// Compute the square of the distance bound, since that's what we'll
// need to compare against.
int dsq = distance * distance;
// Count up the number of nearby pairs of points.
int total = 0;

int fd[2]; // pipe
if ( pipe( fd ) != 0 ){
fail( "Can't create pipe" );
}
int pid; // child
int chNum; // child's number
int c;
for( chNum = 0; chNum < workers; chNum++){
c = 0;
pid = fork();
if ( pid == -1 ){ //failure
fail( "Can't create child process" );
}
if( pid ==0 ){ // it's a child
for ( int i =chNum; i < ptCount; i+=workers)
for ( int j = i + 1; j < ptCount; j++ ) {
// Check the squared distance.
int dx = ptList[ i ].x - ptList[ j ].x;
int dy = ptList[ i ].y - ptList[ j ].y;
if ( dx * dx + dy * dy <= dsq )
c++;
}
close(fd[READ]);
lockf(fd[WRITE], F_LOCK,0);
write(fd[WRITE], &c, sizeof(c));
lockf(fd[WRITE], F_ULOCK,0);
close(fd[WRITE]);
exit(0);
}
else if(pid>0){ // this is parent
int d;
close(fd[WRITE]);
read(fd[READ], &d, sizeof(d));
close(fd[READ]);
total = total + d;
}
}
if(pid>0){
wait(NULL);
printf( "Total: %d\n", total );
}
return 0;
}

我使用for循环使 fork(2)的子级,然后让它们计算计数并将其发送到管道以供父级读取。父对象读入 d并将其添加到 total。我想知道我是否正确使用管道将每个孩子的计数发送给家长和/或我是否正确分叉,因此它只来自一个家长。当我使用一个以上的孩子时,我得到的总数是错误的。
如果我使用一个child,那么总的结果是166428,这是正确的,但是当我使用4时,它给出了164908。有人能帮我吗?

最佳答案

你管得不好。
首先,您不需要锁定/解锁对管道的写入和读取:小于PIPE_BUF字节的写入保证是原子的。POSIX.1-2001要求PIPE_BUF至少为512字节;因为一次只写入sizeof(int)字节,所以是安全的(除非sizeof(int)大于或等于512,这是胡说)。请参见路径名变量值下的man limits.h
{管道}
保证为原子的最大字节数
当给烟斗写信时。最小可接受值:{u POSIX_PIPE_BUF}
这本身就简化了代码并减少了不必要的锁定/解锁开销。
但真正的问题是:

else if (pid > 0) { // this is parent
int d;
close(fd[WRITE]);
read(fd[READ], &d, sizeof(d));
close(fd[READ]);
total = total + d;
}

你不能关闭循环中的 fd[WRITE]:考虑下一个迭代中,当你分叉下一个进程时会发生什么。下一个循环中的子进程将尝试写入已关闭的文件描述符,因此会发生错误(并且 write(2)EBADF中失败,但您从未检查 write(2)的返回值,因此代码很高兴地忽略了该错误)。另外,您尝试一次又一次地关闭 fd[WRITE],因此 close(2)也将返回一个错误(您再次忽略)。
类似地,对于 read(2):如果关闭 fd[READ],则无法在下一次迭代中从管道中读取结果; read(2)将返回一个错误并返回 close(2)
(所以教训是:不要忽略错误。如果你能正确地处理错误,你就能很清楚地知道出了什么问题)
你不需要关门。子进程向管道准确写入 workers整数;父进程从管道准确读取 workers整数,因此这就足够了:
for (chNum = 0; chNum < workers; chNum++) {

c = 0;
pid = fork();

if (pid == -1)
fail("Can't create child process");

if (pid == 0) { // it's a child

for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}

ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");

exit(0);
}
else {
int d;
if (read(fd[READ], &d, sizeof(d)) != sizeof(d))
fail("Read error on pipe");
total += d;
}

}

关键是要明白,只要您计划使用管道的新流程分叉,就需要保持 fd[READ]fd[WRITE]打开。
现在,这解决了问题,但是您得到了一个错误的并行性:如果没有可用的数据,管道中的读取将在默认情况下阻塞。这意味着在每次迭代中,父项都不会取得进展,直到相应的子项写入管道。所以你并没有真正的并行化任何东西;效果与使用父fork相同,等待子fork终止,读取结果并将其添加到total,然后派生下一个子fork(并重复循环)。
如果您想要真正的并行性,您必须分叉每个进程,然后才开始从管道中读取。像这样的:
for (chNum = 0; chNum < workers; chNum++) {

c = 0;
pid = fork();

if (pid == -1)
fail("Can't create child process");

if (pid == 0) { // it's a child

for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}

ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");

exit(0);
}
}



if (close(fd[WRITE]) < 0)
fail("Error closing pipe's write channel");

int d;
ssize_t r;
while ((r = read(fd[READ], &d, sizeof(d))) > 0) {
if (r != sizeof(d))
fail("read error");
total += d;
}

注意,在这里,我们必须在开始读取之前显式地关闭管道的写入通道;这是为了避免在没有更多的子进程正在主动写入管道时挂起父进程。请记住,只要至少有一个进程的管道的写通道打开,读就会阻塞。如果父进程保持写通道打开, read(2)将永远不会返回,因为父进程本身有可能向管道写入数据(即使我们知道它不会)。所以我们必须关闭 fd[WRITE]
或者,由于我们知道要从管道中读取的数字正好 workers,所以我们可以在循环之后执行此操作,而不是关闭写入通道:
int d;
int i;
for (i = 0; i < workers; i++) {
if (read(fd[READ], &d, sizeof(d)) != sizeof(d))
fail("Failed to read from pipe");
total += d;
}

其他一些(不相关的)评论:
给出错误参数时的错误消息与代码不一致。代码显示 distanceargv[1]中, workersargv[2]中,但是传递给 fail()的错误消息似乎表明 distanceargv[2]中。
argv[1]被解析为整数的两倍:with atoi(3)和with sscanf(3)。我坚持使用 sscanf(3),因为您可以检查返回值以确保解析成功。
workers未验证,并使用 atoi(3)转换。忽略错误。我建议用 sscanf(3)来解析它,就像用 distance一样,并确保它成功。
存储a pid的正确类型是 pid_t,而不是 int。请使用正确的类型(除 sys/types.h外,您可能还必须包含 unistd.h)。
这是最终版本,所有这些都整理好了:
int main(int argc, char *argv[]) {
int distance;
int workers;

if (argc != 3 || sscanf(argv[1], "%d", &distance) != 1 || sscanf(argv[2], "%d", &workers) != 1)
fail("usage: <distance> <workers>");

readPoints();

// Compute the square of the distance bound, since that's what we'll
// need to compare against.
int dsq = distance * distance;
// Count up the number of nearby pairs of points.
int total = 0;

int fd[2]; // pipe
if (pipe(fd) != 0)
fail("Can't create pipe");

pid_t pid;
int chNum; // child's number
int c;

for (chNum = 0; chNum < workers; chNum++) {

c = 0;
pid = fork();

if (pid == -1)
fail("Can't create child process");

if (pid == 0) { // it's a child

for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}

ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");

exit(0);
}
}

if (close(fd[WRITE]) < 0)
fail("Error closing pipe's write channel");

int d;
ssize_t r;
while ((r = read(fd[READ], &d, sizeof(d))) > 0) {
if (r != sizeof(d))
fail("read error");
total += d;
}


printf("Total: %d\n", total);

return 0;
}

关于c - 使用多个进程读取文件并通过pipe()发送数字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32408161/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com