gpt4 book ai didi

c - mqueue接收到错误的数据

转载 作者:行者123 更新时间:2023-12-04 10:57:36 26 4
gpt4 key购买 nike

以下是处理器养殖作业的代码。重点是“HERE $resp is always the same/different”的注释。这是我的问题:当 worker 进程完成它的工作并将响应数据发送给 farmer 时,farmer 总是收到相同的响应数据(相同的指针地址),即使 worker 每次发送不同的数据也是如此。

示例:工作人员在地址发送数据:0x7fff42318a900x7ffddba973900x7ffc69e8e060 等,而 farmer 只从一个地址接收数据 0x7ffdb1496f30

我已尽最大努力尽可能地抽象代码和问题。如果我遗漏了重要信息,请告诉我,我是流程管理编程的新手,我可以使用一些指导。

UPDATE:同时打印 resp s.a resp.b 的内容,其中 b 是一个整数,返回相同的值,即使 worker 中的值不同。

更新:我尝试编写一些可运行的代码只是这次工作人员可能没有收到。

//在农民和 worker 中

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq

typedef struct{

int a;

} REQUEST;

typedef struct{

int b;

} RESPONSE;

static char mq_farmer[80];
static char mq_worker[80];

//农民:

int main (int argc, char * argv[])
{

REQUEST req;
RESPONSE resp;

sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());

//define attr
struct mq_attr attr;

attr.mq_maxmsg= 10;

attr.mq_msgsize = sizeof(REQUEST);
mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

attr.mq_msgsize = sizeof(RESPONSE);
mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

// * create the child processes (see process_test() and message_queue_test())
int i;
for(i = 0; i < 3; i++)
{
pid_t processID = fork();
if(processID < 0)
{
//error
}

else if(processID == 0)
{
//some code

execlp("./worker","worker", getpid(), i, NULL);
}
}

pid_t pid = fork();


if(pid < 0)
{
//error
}
else
{
if(pid == 0) //receiving done here
{
for(i = 0; i < 3; i++)
{

// read the messages from the worker queue
mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
//HERE &resp is always the same


}

// end worker process
req.a = -1;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

}
else //sending done here
{
for(i = 0; i < 3; i++)
{
req.a = i;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

}
}


}

waitpid(pid, NULL, 0);
mq_close(reqQueue);
mq_close(respQueue);


//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);

return 0;
}

// worker :

int main (int argc, char * argv[])
{

REQUEST req;
RESPONSE resp;

int arg1;

sscanf(argv[1], "%d", &arg1);

sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);

mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);

mqd_t respQueue = mq_open (mq_worker, O_WRONLY);

while (true){

//receiving
mqd_t received = mq_receive (reqQueue, (char *) &req,
sizeof(req), NULL);

printf("Worker received %p with value %d\n", &req, req.a);

//received stop signal
if(req.a < 0){
printf("stopping worker\n");
break;
}

//waiting for farmer to fork
sleep(3);

//do something with request data
resp.b = req.a;

//send response
mqd_t sent = mq_send (respQueue, (char *) &resp,

sizeof (resp), NULL);

printf("Worker sent response: %p\n", &resp);
//HERE &resp is always different (doesn't print)
}

mq_close(reqQueue);
mq_close(respQueue);


//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);


return 0;
}

最佳答案

当您调用 mq_receive 时,它会将 data 放在第二个参数指向的缓冲区中,您将其指定为 &resp。它不会改变指针本身。

&resp 是父级中的一个固定地址,除非更改它,从发布的代码来看这似乎不太可能[确实显示resp]的定义,所以:

printf("Received worker response: %p\n", &resp);

您将始终获得相同的值。

你 [可能] 想要做的是打印 resp 包含的内容


更新:

好的,还有一些错误。

错误是,虽然您可以为 worker 到农民的消息设置一个队列(即响应队列),但您不能 em> 使用单个队列来处理对工作人员的请求。他们每个人都需要自己的请求队列。

否则,单个工作人员可以吸收/垄断所有请求,即使是属于其他人的请求。如果发生这种情况,农民可能会看到只有那个 worker 的标记消息。

这就是您所看到的,因为第一个工作人员 [可能是 #0] 首先完成了它的 mq_receive。因此,速度如此之快,以至于它在任何其他人可以访问它们之前完成所有 mq_receive/mq_send

然后它将看到“停止”消息并退出。如果其他人“幸运”,则第一个工作人员将剩余的停止消息留在队列中。但是,没有请求消息,因此它们从不发送响应。

此外,响应队列由农民使用 O_WRONLY 而不是 O_RDONLY 打开。

我已经为您的程序制作了两个版本。一个带有错误注释。另一个已清理并正常工作。


这是注释版本[请原谅不必要的样式清理]:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq

typedef struct {
int a;
} REQUEST;

typedef struct {
int b;
} RESPONSE;

char *pgmname;

static char mq_farmer[80];
static char mq_worker[80];

int
main(int argc,char **argv)
{

REQUEST req;
RESPONSE resp;
ssize_t sent;

pgmname = argv[0];

--argc;
++argv;

sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());

// define attr
// NOTE/BUG: this can have random data in it
struct mq_attr attr;

attr.mq_maxmsg = 10;

// NOTE/BUG: this is _the_ big one -- we're only doing a single request
// queue -- each worker needs its _own_ request queue -- otherwise, a
// single worker can _monopolize_ all messages for the other workers
attr.mq_msgsize = sizeof(REQUEST);
mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

// NOTE/BUG: this should be opened for reading
attr.mq_msgsize = sizeof(RESPONSE);
mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

// create the child processes (see process_test() and message_queue_test())
int i;

// NOTE/BUG: we must remember the child pid numbers so we can do waitpid
// later
for (i = 0; i < 3; i++) {
pid_t processID = fork();

if (processID < 0) {
// error
}

else if (processID == 0) {
// some code

// NOTE/BUG: exec* takes strings so this is wrong
execlp("./worker","worker",getpid(),i,NULL);
}
}

// NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
// _not_ mqd_t

pid_t pid = fork();

if (pid < 0) {
// error
}
else {
// receiving done here
if (pid == 0) {
for (i = 0; i < 3; i++) {

// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);

printf("Farmer received worker response: %p with length %ld value %d\n",
&resp,received,resp.b);
// HERE &resp is always the same
}

// end worker process
req.a = -1;
sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
printf("Farmer sent stop -- sent=%ld\n",sent);

// NOTE/BUG: we need to exit here
}

// sending done here
else {
for (i = 0; i < 3; i++) {
req.a = i;
sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
}
}

}

// NOTE/BUG: we're waiting on the double fork farmer, but _not_
// on the actual worker pids
waitpid(pid,NULL,0);

mq_close(reqQueue);
mq_close(respQueue);

// clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);

return 0;
}

int
worker_main(int argc,char *argv[])
{

REQUEST req;
RESPONSE resp;
ssize_t sent;

int arg1;

// NOTE/BUG: use getppid instead
sscanf(argv[1],"%d",&arg1);
printf("worker: my index is %d ...\n",arg1);

sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);

mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);

mqd_t respQueue = mq_open(mq_worker,O_WRONLY);

while (1) {
// receiving
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);

printf("Worker received %p with length %ld value %d\n",
&req,received,req.a);

// received stop signal
if (req.a < 0) {
printf("stopping worker\n");
break;
}

// waiting for farmer to fork
sleep(3);

// do something with request data
resp.b = req.a;

// send response
// NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif

printf("Worker sent response %p with length %ld value %d\n",
&req,sent,req.a);
// HERE &resp is always different (doesn't print)
}

mq_close(reqQueue);
mq_close(respQueue);

// clean up the message queues
// NOTE/BUG: farmer should do this -- not worker
mq_unlink(mq_farmer);
mq_unlink(mq_worker);

return 0;
}

这是清理后的工作版本。请注意,为了简单起见,我将农民和 worker 程序合并为一个程序,在 main 中使用了一些技巧:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq

typedef struct {
int a;
} REQUEST;

typedef struct {
int b;
} RESPONSE;

char *pgmname;
int opt_x;
int opt_W;

#define WORKNR 3

char mqfile_to_farmer[80];
char mqfile_to_worker[80];

struct mq_attr attr;

pid_t ppid;

// per-worker control
struct worker {
pid_t wk_pid;
mqd_t wk_req;
char wk_mqfile[80];
};

struct worker worklist[WORKNR];

void
worker(void)
{

REQUEST req;
RESPONSE resp;
ssize_t sent;

ppid = getppid();

printf("worker: my index is %d ...\n",opt_W);

sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);

mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);

while (1) {
// receiving
errno = 0;
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);

printf("Worker %d received %p with length %ld value %d -- %s\n",
opt_W,&req,received,req.a,strerror(errno));
if (received < 0)
exit(77);

// received stop signal
if (req.a < 0) {
printf("stopping worker\n");
break;
}

// do something with request data
resp.b = req.a;

// send response
errno = 0;
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

printf("Worker %d sent response %p with length %ld value %d -- %s\n",
opt_W,&req,sent,req.a,strerror(errno));
// HERE &resp is always different (doesn't print)
if (sent < 0)
exit(78);
}

mq_close(reqQueue);
mq_close(respQueue);

exit(0);
}

void
farmer(void)
{

REQUEST req;
RESPONSE resp;
ssize_t sent;
struct worker *wk;

ppid = getpid();

sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);

attr.mq_maxmsg = 10;

attr.mq_msgsize = sizeof(REQUEST);
mqd_t respQueue = mq_open(mqfile_to_farmer,
O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
if (respQueue < 0) {
printf("farmer: respQueue open fault -- %s\n",strerror(errno));
exit(1);
}

// create the child processes (see process_test() and message_queue_test())
int i;

// create the separate request queues
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
attr.mq_msgsize = sizeof(RESPONSE);
sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
&attr);
if (wk->wk_req < 0) {
printf("farmer: wk_req open fault -- %s\n",strerror(errno));
exit(1);
}
}

for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];

pid_t pid = fork();

if (pid < 0) {
perror("fork");
exit(9);
}

if (pid != 0) {
wk->wk_pid = pid;
continue;
}

// NOTE/FIX: exec* takes strings so this is the correct way
if (opt_x) {
char xid[20];
sprintf(xid,"-W%d",i);
execlp(pgmname,pgmname,xid,NULL);
perror("execlp");
exit(7);
}

// simulate what exec would do -- call it direct
opt_W = i;
worker();
}

pid_t pid = fork();

if (pid < 0) {
perror("fork2");
exit(5);
}

// receiving done here
if (pid == 0) {
for (i = 0; i < WORKNR; i++) {

// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);

printf("Farmer received worker response: %p with length %ld value %d\n",
&resp,received,resp.b);
// HERE &resp is always the same
}

// end worker process
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
req.a = -1;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
printf("Farmer sent stop -- sent=%ld\n",sent);
}

// exit the farmer's receiver
printf("farmer: receiver exiting ...\n");
exit(0);
}

// sending done here
else {
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
req.a = i;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
}

// wait for farmer's receiver to complete
printf("farmer: waiting for receiver to finish ...\n");
waitpid(pid,NULL,0);
}

mq_close(respQueue);

// wait for all workers to complete
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
printf("farmer: waiting for worker to finish ...\n");
waitpid(wk->wk_pid,NULL,0);
mq_close(wk->wk_req);
mq_unlink(wk->wk_mqfile);
}

// clean up the message queues
mq_unlink(mqfile_to_farmer);
}

int
main(int argc,char **argv)
{
char *cp;

pgmname = argv[0];

--argc;
++argv;

opt_W = -1;

for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;

switch (cp[1]) {
case 'W':
opt_W = atoi(cp + 2);
break;
case 'x':
opt_x = ! opt_x;
break;
}
}

if (opt_W >= 0)
worker();
else
farmer();

return 0;
}

更新#2:

这是一个演示单个请求队列与多个请求队列的版本。工作人员现在检查他们收到的消息中的目标 ID 是否与他们的工作人员编号相匹配。

如果您只是不带任何选项地运行它,您将获得多个队列和“良好”输出。

如果你使用 -b [和可选的 -s] 运行它,你将得到一个单一的请求队列,程序将看到错误路由的消息(例如 worker 0 grabs给工作人员 1 的消息)。

单个队列是一个子集。只要 worker “平等”就可以。但是,如果他们不是(例如,一个 worker 可以做其他人不能做的事情),那么能够排队到正确的 worker 就很重要了。一个例子是网络节点具有其他节点没有的特殊 FPGA 辅助计算硬件,并且某些请求需要这种加速。

此外,单个队列由 workers 进行 self 平衡。这是一种调度形式,但还有其他模型。 (例如,农民希望保留对劳动力分配的控制权)。或者,农民必须停止一名 worker 并让其他 worker 继续工作(例如,停止的系统将断电以进行维护)。

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq

typedef unsigned int u32;

typedef struct {
u32 seqno; // sequence number
int toval; // destination id
int fmval; // responder worker id
} request_t;

char *pgmname;
int opt_b; // 1=broadcast
int opt_i; // 1=ignore errors
int opt_x; // 1=do execlp
int opt_s; // number of ms to sleep
int opt_S; // sequence maximum
int opt_W; // worker xid

#define WORKNR 3
#define MAXMSG 10

char mqfile_to_farmer[80];
mqd_t respQueue;

char mqfile_to_worker[80];
mqd_t reqQueue;

struct mq_attr attr;

pid_t ppid;
pid_t curpid;
pid_t pidrcvr;

// per-worker control
typedef struct {
int wk_xid;
pid_t wk_pid;
mqd_t wk_req;
u32 wk_seqno;
char wk_mqfile[80];
} worker_t;
worker_t worklist[WORKNR];

#define FORALL_WK \
wk = &worklist[0]; wk < &worklist[WORKNR]; ++wk

#define sysfault(_fmt...) \
do { \
printf(_fmt); \
if (ppid) \
kill(ppid,SIGUSR1); \
exit(1); \
} while (0)

void
_sysfault(void)
{

__asm__ __volatile__("" :::);
}

#define logprt(_fmt...) \
do { \
int sverr = errno; \
_logprt(); \
printf(_fmt); \
errno = sverr; \
} while (0)

int logxid;
double logzero;

void
loginit(int xid)
{

logxid = xid;
}

void
_logprt(void)
{
struct timespec ts;
double sec;

clock_gettime(CLOCK_REALTIME,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;

if (logzero == 0)
logzero = sec;

sec -= logzero;

switch (logxid) {
case WORKNR:
printf("%.9f LOG F: ",sec);
break;
case WORKNR + 1:
printf("%.9f LOG R: ",sec);
break;
default:
printf("%.9f LOG W%d: ",sec,logxid);
break;
}
}

void
logexit(int code)
{

exit(code);
}

void
allwait(void)
{
worker_t *wk;

// wait for farmer's receiver to complete
if (pidrcvr) {
logprt("farmer: waiting for receiver to finish ...\n");
waitpid(pidrcvr,NULL,0);
pidrcvr = 0;
}

for (FORALL_WK) {
if (wk->wk_pid) {
logprt("farmer: waiting for worker %d to finish ...\n",wk->wk_xid);
waitpid(wk->wk_pid,NULL,0);
wk->wk_pid = 0;
}

if (opt_b)
continue;

logprt("farmer: closing and removing worker queue ...\n");
mq_close(wk->wk_req);
mq_unlink(wk->wk_mqfile);
}
}

void
sighdr(int signo)
{
worker_t *wk;

switch (signo) {
case SIGUSR1: // request to master
logprt("sighdr: got master stop signal ...\n");

if (pidrcvr)
kill(pidrcvr,SIGUSR2);

for (FORALL_WK) {
if (wk->wk_pid)
kill(wk->wk_pid,SIGUSR2);
}

allwait();
logprt("farmer: abnormal termination\n");

logexit(1);
break;

case SIGUSR2: // request to slaves
logexit(1);
break;
}
}

void
reqopen(mqd_t *fdp,const char *file,int flag)
{
mqd_t fd;
int err;

attr.mq_maxmsg = MAXMSG;
attr.mq_msgsize = sizeof(request_t);

fd = *fdp;
if (fd >= 0)
mq_close(fd);

fd = mq_open(file,flag | O_CREAT,0600,&attr);
if (fd < 0)
sysfault("reqopen: %s open fault -- %s\n",file,strerror(errno));

err = mq_getattr(fd,&attr);
if (err < 0)
sysfault("reqopen: %s getattr fault -- %s\n",file,strerror(errno));

if (attr.mq_msgsize != sizeof(request_t))
sysfault("reqopen: %s size fault -- mq_msgsize=%ld siz=%ld\n",
file,attr.mq_msgsize,sizeof(request_t));

logprt("reqopen: open -- file='%s' fd=%d\n",file,fd);

*fdp = fd;
}

void worker(int execflg);

void
farmer(void)
{
request_t req;
request_t resp;
ssize_t sent;
worker_t *wk;
u32 seqno;
int xid;

ppid = getpid();
curpid = ppid;
loginit(WORKNR);

sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
sprintf(mqfile_to_worker,"/mq_request_%d",ppid);

respQueue = -1;
reqopen(&respQueue,mqfile_to_farmer,O_RDONLY | O_CREAT | O_EXCL);

reqQueue = -1;
if (opt_b)
reqopen(&reqQueue,mqfile_to_worker,O_WRONLY | O_CREAT | O_EXCL);

// create the separate request queues
xid = 0;
for (FORALL_WK) {
wk->wk_xid = xid++;

if (opt_b) {
logprt("farmer: common request queue -- reqQueue=%d\n",reqQueue);
wk->wk_req = reqQueue;
continue;
}

sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,wk->wk_xid);

wk->wk_req = -1;
reqopen(&wk->wk_req,wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL);
logprt("farmer: separate request queue -- wk_req=%d\n",wk->wk_req);
}

// fork the workers
for (FORALL_WK) {
pid_t pid = fork();

if (pid < 0)
sysfault("farmer: fork fault -- %s\n",strerror(errno));

if (pid != 0) {
wk->wk_pid = pid;
continue;
}

// NOTE/FIX: exec* takes strings so this is the correct way
if (opt_x) {
char opt[2][20];

sprintf(opt[0],"-b%d",opt_b);
sprintf(opt[1],"-W%d",wk->wk_xid);

execlp(pgmname,pgmname,opt[0],opt[1],NULL);
sysfault("farmer: execlp error -- %s\n",strerror(errno));
}

// simulate what exec would do -- call it direct
opt_W = wk->wk_xid;
worker(0);
}

pidrcvr = fork();
if (pidrcvr < 0)
sysfault("farmer: fork2 error -- %s\n",strerror(errno));

// receiving done here
if (pidrcvr == 0) {
curpid = getpid();
loginit(WORKNR + 1);

for (int i = 0; i < (WORKNR * opt_S); i++) {
// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);

wk = &worklist[resp.fmval];
logprt("received worker response: length %d fmval=%d seqno=%u wk_seqno=%u\n",
(int) received,resp.fmval,resp.seqno,wk->wk_seqno);

if (received < 0) {
if (! opt_i)
sysfault("farmer: received fault -- %s\n",strerror(errno));
}

if (resp.seqno != wk->wk_seqno) {
logprt("sequence fault\n");
if (! opt_i)
sysfault("farmer: sequence fault\n");
}

++wk->wk_seqno;
}

// send stop to worker processes
for (FORALL_WK) {
req.toval = -1;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
logprt("Farmer sent stop -- wk_xid=%d sent=%d\n",
wk->wk_xid,(int) sent);

if (sent < 0) {
if (! opt_i)
sysfault("farmer: send fault on stop -- %s\n",
strerror(errno));
}
}

// exit the farmer's receiver
logprt("farmer: receiver exiting ...\n");
logexit(0);
}

// sending done here
else {
for (seqno = 0; seqno < opt_S; ++seqno) {
for (FORALL_WK) {
wk->wk_seqno = seqno;
req.seqno = seqno;
req.toval = wk->wk_xid;

sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
logprt("Farmer sent to wk_xid=%d wk_req=%d -- sent=%d\n",
wk->wk_xid,wk->wk_req,(int) sent);
if (sent < 0) {
if (! opt_i)
sysfault("farmer: send fault -- %s\n",strerror(errno));
}
}
}
}

mq_close(respQueue);

// wait for all workers to complete
allwait();

// clean up the message queues
mq_unlink(mqfile_to_farmer);

logprt("farmer: complete\n");
logexit(0);
}

void
worker(int execflg)
{
request_t req;
request_t resp;
ssize_t sent;
u32 seqno;
int slpcnt;

if (execflg)
ppid = getppid();
curpid = getpid();

loginit(opt_W);
logprt("worker: my index is %d ...\n",opt_W);

attr.mq_maxmsg = MAXMSG;

sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
reqopen(&respQueue,mqfile_to_farmer,O_WRONLY);

if (opt_b)
sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
else
sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
reqopen(&reqQueue,mqfile_to_worker,O_RDONLY);

seqno = 0;

slpcnt = opt_s;
slpcnt *= 1000;
slpcnt *= opt_W;

while (1) {
if (slpcnt > 0) {
logprt("sleep %d\n",slpcnt);
usleep(slpcnt);
slpcnt = 0;
}

// receiving
errno = 0;
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);

logprt("received length %d -- seqno=%u toval=%d\n",
(int) received,req.seqno,req.toval);

if (received < 0)
sysfault("worker: mq_receive fault -- %s\n",strerror(errno));

// received stop signal
if (req.toval < 0) {
logprt("stopping ...\n");
break;
}

if (req.toval != opt_W) {
logprt("misroute\n");
if (! opt_i)
sysfault("worker: misroute fault\n");
}

if (req.seqno != seqno) {
logprt("sequence fault\n");
if (! opt_i)
sysfault("worker: sequence fault\n");
}

// do something with request data
resp.seqno = req.seqno;
resp.toval = req.toval;
resp.fmval = opt_W;

// send response
errno = 0;
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

logprt("sent response with length %d -- seqno=%u toval=%d\n",
(int) sent,req.seqno,resp.toval);

// HERE &resp is always different (doesn't print)
if (sent < 0)
sysfault("worker: mq_send fault -- %s\n",strerror(errno));

++seqno;
}

mq_close(reqQueue);
mq_close(respQueue);

logexit(0);
}

int
main(int argc,char **argv)
{
char *cp;

pgmname = argv[0];

--argc;
++argv;

opt_W = -1;
opt_S = 3;

reqQueue = -1;
respQueue = -1;

signal(SIGUSR1,sighdr);
signal(SIGUSR2,sighdr);

for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;

switch (cp[1]) {
case 'b': // broadcast mode (single request queue)
cp += 2;
opt_b = (*cp != 0) ? atoi(cp) : 1;
break;

case 'i': // ignore errors
cp += 2;
opt_i = (*cp != 0) ? atoi(cp) : 1;
break;

case 'S': // sequence maximum
cp += 2;
opt_S = (*cp != 0) ? atoi(cp) : 3;
break;

case 's': // sleep mode (milliseconds)
cp += 2;
opt_s = (*cp != 0) ? atoi(cp) : 3;
break;

case 'W': // worker number
cp += 2;
opt_W = atoi(cp + 2);
break;

case 'x': // use execlp
opt_x = ! opt_x;
break;
}
}

if (opt_W >= 0)
worker(1);
else
farmer();

return 0;
}

关于c - mqueue接收到错误的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39668354/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com