dannychris37 dannychris37 - 2 months ago 12
C Question

mqueue receive wrong data

Below is the code for an assignment on processor farming. The focus is on the comments with "HERE $resp is always the same/different". That's my problem: when the worker process does it's job and sends the response data to the farmer, the farmer always receives the same response data (the same pointer address), even though worker sends different data every time.

Example: workers send data at addresses:

0x7fff42318a90
,
0x7ffddba97390
,
0x7ffc69e8e060
etc. and farmer keeps receiving data from only one address
0x7ffdb1496f30


I've done my best to abstract the code and question as much as possible. If I've omitted important information please let me know, I'm new to process management programming and I could use some guidance.

UPDATE: also printing the contents of
resp
s.a
resp.b
where
b
is an integer returns the same value, even though the value is different in worker.

UPDATE: I tried writing some runnable code only this time the worker might not be receiving.

//both in farmer and in worker

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq

typedef struct{

int a;

} REQUEST;

typedef struct{

int b;

} RESPONSE;

static char mq_farmer[80];
static char mq_worker[80];


//farmer:

int main (int argc, char * argv[])
{

REQUEST req;
RESPONSE resp;

sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());

//define attr
struct mq_attr attr;

attr.mq_maxmsg= 10;

attr.mq_msgsize = sizeof(REQUEST);
mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

attr.mq_msgsize = sizeof(RESPONSE);
mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

// * create the child processes (see process_test() and message_queue_test())
int i;
for(i = 0; i < 3; i++)
{
pid_t processID = fork();
if(processID < 0)
{
//error
}

else if(processID == 0)
{
//some code

execlp("./worker","worker", getpid(), i, NULL);
}
}

pid_t pid = fork();


if(pid < 0)
{
//error
}
else
{
if(pid == 0) //receiving done here
{
for(i = 0; i < 3; i++)
{

// read the messages from the worker queue
mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
//HERE &resp is always the same


}

// end worker process
req.a = -1;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

}
else //sending done here
{
for(i = 0; i < 3; i++)
{
req.a = i;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

}
}


}

waitpid(pid, NULL, 0);
mq_close(reqQueue);
mq_close(respQueue);


//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);

return 0;
}


//worker:

int main (int argc, char * argv[])
{

REQUEST req;
RESPONSE resp;

int arg1;

sscanf(argv[1], "%d", &arg1);

sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);

mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);

mqd_t respQueue = mq_open (mq_worker, O_WRONLY);

while (true){

//receiving
mqd_t received = mq_receive (reqQueue, (char *) &req,
sizeof(req), NULL);

printf("Worker received %p with value %d\n", &req, req.a);

//received stop signal
if(req.a < 0){
printf("stopping worker\n");
break;
}

//waiting for farmer to fork
sleep(3);

//do something with request data
resp.b = req.a;

//send response
mqd_t sent = mq_send (respQueue, (char *) &resp,

sizeof (resp), NULL);

printf("Worker sent response: %p\n", &resp);
//HERE &resp is always different (doesn't print)
}

mq_close(reqQueue);
mq_close(respQueue);


//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);


return 0;
}

Answer

When you call mq_receive it places the data at the buffer pointed to by the second argument, which you give as &resp. It does not change the pointer itself.

&resp is a fixed address in the parent, unless you change it, which appears unlikely from the posted code [which does not show the definition of resp], so:

printf("Received worker response: %p\n", &resp);

You will always get the same value.

What you [probably] want to do is print what resp contains


UPDATE:

Okay, there were a few more bugs.

The big bug is that while you can have one queue for worker-to-farmer messages (i.e. the response queue), you can not use a single queue for requests to workers. They each need their own request queue.

Otherwise, a single worker can absorb/monopolize all requests, even ones that belong to others. If that happened, the farmer would likely see messages that were stamped from only that worker.

This is what you're seeing, because, the first worker [probably #0] has its mq_receive complete first. It is, then, so fast that it does all of the mq_receive/mq_send before any others can get to them.

It will then see a "stop" message and exit. If the others are "lucky", the first worker left the remaining stop messages in the queue. But, no request messages, so they never send a response.

Also, the response queue was opened by the farmer with O_WRONLY instead of O_RDONLY.

I've produced two versions of your program. One with annotations for bugs. Another that is cleaned up and working.


Here's the annotated version [please pardon the gratuitous style cleanup]:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;

static char mq_farmer[80];
static char mq_worker[80];

int
main(int argc,char **argv)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    pgmname = argv[0];

    --argc;
    ++argv;

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
    sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());

    // define attr
    // NOTE/BUG: this can have random data in it
    struct mq_attr attr;

    attr.mq_maxmsg = 10;

    // NOTE/BUG: this is _the_ big one -- we're only doing a single request
    // queue -- each worker needs its _own_ request queue -- otherwise, a
    // single worker can _monopolize_ all messages for the other workers
    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // NOTE/BUG: this should be opened for reading
    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // NOTE/BUG: we must remember the child pid numbers so we can do waitpid
    // later
    for (i = 0; i < 3; i++) {
        pid_t processID = fork();

        if (processID < 0) {
            // error
        }

        else if (processID == 0) {
            // some code

            // NOTE/BUG: exec* takes strings so this is wrong
            execlp("./worker","worker",getpid(),i,NULL);
        }
    }

    // NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
    // _not_ mqd_t

    pid_t pid = fork();

    if (pid < 0) {
        // error
    }
    else {
        // receiving done here
        if (pid == 0) {
            for (i = 0; i < 3; i++) {

                // read the messages from the worker queue
                ssize_t received = mq_receive(respQueue,(char *) &resp,
                    sizeof(resp),NULL);

                printf("Farmer received worker response: %p with length %ld value %d\n",
                    &resp,received,resp.b);
                // HERE &resp is always the same
            }

            // end worker process
            req.a = -1;
            sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);

            // NOTE/BUG: we need to exit here
        }

        // sending done here
        else {
            for (i = 0; i < 3; i++) {
                req.a = i;
                sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
                printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
            }
        }

    }

    // NOTE/BUG: we're waiting on the double fork farmer, but _not_
    // on the actual worker pids
    waitpid(pid,NULL,0);

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

int
worker_main(int argc,char *argv[])
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    int arg1;

    // NOTE/BUG: use getppid instead
    sscanf(argv[1],"%d",&arg1);
    printf("worker: my index is %d ...\n",arg1);

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
    sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);

    mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);

    mqd_t respQueue = mq_open(mq_worker,O_WRONLY);

    while (1) {
        // receiving
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker received %p with length %ld value %d\n",
            &req,received,req.a);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // waiting for farmer to fork
        sleep(3);

        // do something with request data
        resp.b = req.a;

        // send response
        // NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif

        printf("Worker sent response %p with length %ld value %d\n",
            &req,sent,req.a);
        // HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    // NOTE/BUG: farmer should do this -- not worker
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

Here's the cleaned up and working version. Note that, for ease/simplicity, I combined both the farmer and worker programs into a single one, using a little bit of trickery in main:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;
int opt_x;
int opt_W;

#define WORKNR      3

char mqfile_to_farmer[80];
char mqfile_to_worker[80];

struct mq_attr attr;

pid_t ppid;

// per-worker control
struct worker {
    pid_t wk_pid;
    mqd_t wk_req;
    char wk_mqfile[80];
};

struct worker worklist[WORKNR];

void
worker(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    ppid = getppid();

    printf("worker: my index is %d ...\n",opt_W);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);

    mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
    mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);

    while (1) {
        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker %d received %p with length %ld value %d -- %s\n",
            opt_W,&req,received,req.a,strerror(errno));
        if (received < 0)
            exit(77);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // do something with request data
        resp.b = req.a;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        printf("Worker %d sent response %p with length %ld value %d -- %s\n",
            opt_W,&req,sent,req.a,strerror(errno));
        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            exit(78);
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    exit(0);
}

void
farmer(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;
    struct worker *wk;

    ppid = getpid();

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);

    attr.mq_maxmsg = 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t respQueue = mq_open(mqfile_to_farmer,
        O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
    if (respQueue < 0) {
        printf("farmer: respQueue open fault -- %s\n",strerror(errno));
        exit(1);
    }

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // create the separate request queues
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        attr.mq_msgsize = sizeof(RESPONSE);
        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
        wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
            &attr);
        if (wk->wk_req < 0) {
            printf("farmer: wk_req open fault -- %s\n",strerror(errno));
            exit(1);
        }
    }

    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];

        pid_t pid = fork();

        if (pid < 0) {
            perror("fork");
            exit(9);
        }

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char xid[20];
            sprintf(xid,"-W%d",i);
            execlp(pgmname,pgmname,xid,NULL);
            perror("execlp");
            exit(7);
        }

        // simulate what exec would do -- call it direct
        opt_W = i;
        worker();
    }

    pid_t pid = fork();

    if (pid < 0) {
        perror("fork2");
        exit(5);
    }

    // receiving done here
    if (pid == 0) {
        for (i = 0; i < WORKNR; i++) {

            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            printf("Farmer received worker response: %p with length %ld value %d\n",
                &resp,received,resp.b);
            // HERE &resp is always the same
        }

        // end worker process
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);
        }

        // exit the farmer's receiver
        printf("farmer: receiver exiting ...\n");
        exit(0);
    }

    // sending done here
    else {
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = i;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
        }

        // wait for farmer's receiver to complete
        printf("farmer: waiting for receiver to finish ...\n");
        waitpid(pid,NULL,0);
    }

    mq_close(respQueue);

    // wait for all workers to complete
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        printf("farmer: waiting for worker to finish ...\n");
        waitpid(wk->wk_pid,NULL,0);
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'W':
            opt_W = atoi(cp + 2);
            break;
        case 'x':
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker();
    else
        farmer();

    return 0;
}