mqueue接收到错误的数据

mqueue receive wrong data

以下是处理器农场作业的代码。重点是"HERE $resp is always the same/different"的评论。那是我的问题:当 worker 进程完成它的工作并将响应数据发送给 farmer 时,farmer 总是收到相同的响应数据(相同的指针地址),即使 worker 每次发送不同的数据。

示例:工作人员在地址发送数据:0x7fff42318a900x7ffddba973900x7ffc69e8e060等,而农民只从一个地址接收数据0x7ffdb1496f30

我已经尽力将代码和问题尽可能地抽象出来。如果我遗漏了重要信息,请告诉我,我是流程管理编程的新手,我可以使用一些指导。

UPDATE: 同时打印 resp s.a resp.b 的内容 b 是一个整数 returns 相同的值,即使 worker 中的值不同。

更新:我只是尝试编写一些可运行的代码,这次工作人员可能没有收到。

//农民和工人都

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>         // for execlp
#include <mqueue.h>         // for mq

typedef struct{

    int a;

} REQUEST;

typedef struct{

    int b;

} RESPONSE;

static char mq_farmer[80];
static char mq_worker[80];

//农民:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
    sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());

    //define attr
    struct mq_attr attr;

    attr.mq_maxmsg= 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    //  * create the child processes (see process_test() and message_queue_test())
    int i;
    for(i = 0; i < 3; i++)
        {
            pid_t processID = fork();
            if(processID < 0)
                {
                    //error
                }

            else if(processID == 0)
                {
                    //some code

                    execlp("./worker","worker", getpid(), i, NULL);
                }
        }

    pid_t pid = fork();


    if(pid < 0)
        {
            //error
        }
    else
        {
            if(pid == 0) //receiving done here
                {
                    for(i = 0; i < 3; i++)
                        {

                            // read the messages from the worker queue
                            mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
                            printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
                            //HERE &resp is always the same


                        }

                    // end worker process
                    req.a = -1;
                    mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                }
            else //sending done here
                {
                    for(i = 0; i < 3; i++)
                        {
                            req.a = i;
                            mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                        }
                }


        }

    waitpid(pid, NULL, 0);
    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

//工人:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    int arg1;

    sscanf(argv[1], "%d", &arg1);

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
    sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);

    mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);

    mqd_t respQueue = mq_open (mq_worker, O_WRONLY);

    while (true){

        //receiving
        mqd_t received = mq_receive (reqQueue, (char *) &req,
                                     sizeof(req), NULL);

        printf("Worker received %p with value %d\n", &req, req.a);

        //received stop signal
        if(req.a < 0){
            printf("stopping worker\n");
            break;
        }

        //waiting for farmer to fork 
        sleep(3);

        //do something with request data
        resp.b = req.a;

        //send response
        mqd_t sent = mq_send (respQueue, (char *) &resp,

                              sizeof (resp), NULL);

        printf("Worker sent response: %p\n", &resp);
        //HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);


    return 0;
}

当您调用 mq_receive 时,它会将 数据 放置在第二个参数指向的缓冲区中,您将其指定为 &resp。它不会改变指针本身。

&resp 是父级中的 fixed 地址,除非 you 更改它,从发布的代码看来这不太可能[显示resp的定义],所以:

printf("Received worker response: %p\n", &resp);

您将始终获得相同的值。

您[可能]想要做的是打印 resp 包含的内容


更新:

好的,还有一些错误。

错误是,虽然您可以有 一个 工人到农民消息的队列(即响应队列),您可以使用单个队列来处理对工作人员的请求。他们每个人都需要自己的请求队列。

否则,一个工人可以absorb/monopolize 所有 请求,即使是属于其他人的请求。如果发生这种情况,农民可能会看到那个工人标记的消息。

这就是您所看到的,因为第一个工人 [可能是 #0] 首先 mq_receive 完成。因此,fast 它在任何其他人到达它们之前完成 all mq_receive/mq_send

然后它将看到 "stop" 消息并退出。如果其他人是"lucky",则第一个工作人员将剩余的停止消息留在队列中。但是,没有请求消息,因此他们从不发送响应。

此外,响应队列是由农民用 O_WRONLY 而不是 O_RDONLY 打开的。

我已经制作了你程序的两个版本。一个带有错误注释。另一个已清理并正常工作。


这是注释版本[请原谅无偿的样式清理]:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;

static char mq_farmer[80];
static char mq_worker[80];

int
main(int argc,char **argv)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    pgmname = argv[0];

    --argc;
    ++argv;

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
    sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());

    // define attr
    // NOTE/BUG: this can have random data in it
    struct mq_attr attr;

    attr.mq_maxmsg = 10;

    // NOTE/BUG: this is _the_ big one -- we're only doing a single request
    // queue -- each worker needs its _own_ request queue -- otherwise, a
    // single worker can _monopolize_ all messages for the other workers
    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // NOTE/BUG: this should be opened for reading
    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // NOTE/BUG: we must remember the child pid numbers so we can do waitpid
    // later
    for (i = 0; i < 3; i++) {
        pid_t processID = fork();

        if (processID < 0) {
            // error
        }

        else if (processID == 0) {
            // some code

            // NOTE/BUG: exec* takes strings so this is wrong
            execlp("./worker","worker",getpid(),i,NULL);
        }
    }

    // NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
    // _not_ mqd_t

    pid_t pid = fork();

    if (pid < 0) {
        // error
    }
    else {
        // receiving done here
        if (pid == 0) {
            for (i = 0; i < 3; i++) {

                // read the messages from the worker queue
                ssize_t received = mq_receive(respQueue,(char *) &resp,
                    sizeof(resp),NULL);

                printf("Farmer received worker response: %p with length %ld value %d\n",
                    &resp,received,resp.b);
                // HERE &resp is always the same
            }

            // end worker process
            req.a = -1;
            sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);

            // NOTE/BUG: we need to exit here
        }

        // sending done here
        else {
            for (i = 0; i < 3; i++) {
                req.a = i;
                sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
                printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
            }
        }

    }

    // NOTE/BUG: we're waiting on the double fork farmer, but _not_
    // on the actual worker pids
    waitpid(pid,NULL,0);

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

int
worker_main(int argc,char *argv[])
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    int arg1;

    // NOTE/BUG: use getppid instead
    sscanf(argv[1],"%d",&arg1);
    printf("worker: my index is %d ...\n",arg1);

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
    sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);

    mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);

    mqd_t respQueue = mq_open(mq_worker,O_WRONLY);

    while (1) {
        // receiving
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker received %p with length %ld value %d\n",
            &req,received,req.a);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // waiting for farmer to fork
        sleep(3);

        // do something with request data
        resp.b = req.a;

        // send response
        // NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif

        printf("Worker sent response %p with length %ld value %d\n",
            &req,sent,req.a);
        // HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    // NOTE/BUG: farmer should do this -- not worker
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

这是清理后的工作版本。请注意,对于 ease/simplicity,我将农民和工人程序合并为一个,在 main:

中使用了一些技巧
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;
int opt_x;
int opt_W;

#define WORKNR      3

char mqfile_to_farmer[80];
char mqfile_to_worker[80];

struct mq_attr attr;

pid_t ppid;

// per-worker control
struct worker {
    pid_t wk_pid;
    mqd_t wk_req;
    char wk_mqfile[80];
};

struct worker worklist[WORKNR];

void
worker(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    ppid = getppid();

    printf("worker: my index is %d ...\n",opt_W);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);

    mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
    mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);

    while (1) {
        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker %d received %p with length %ld value %d -- %s\n",
            opt_W,&req,received,req.a,strerror(errno));
        if (received < 0)
            exit(77);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // do something with request data
        resp.b = req.a;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        printf("Worker %d sent response %p with length %ld value %d -- %s\n",
            opt_W,&req,sent,req.a,strerror(errno));
        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            exit(78);
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    exit(0);
}

void
farmer(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;
    struct worker *wk;

    ppid = getpid();

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);

    attr.mq_maxmsg = 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t respQueue = mq_open(mqfile_to_farmer,
        O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
    if (respQueue < 0) {
        printf("farmer: respQueue open fault -- %s\n",strerror(errno));
        exit(1);
    }

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // create the separate request queues
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        attr.mq_msgsize = sizeof(RESPONSE);
        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
        wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
            &attr);
        if (wk->wk_req < 0) {
            printf("farmer: wk_req open fault -- %s\n",strerror(errno));
            exit(1);
        }
    }

    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];

        pid_t pid = fork();

        if (pid < 0) {
            perror("fork");
            exit(9);
        }

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char xid[20];
            sprintf(xid,"-W%d",i);
            execlp(pgmname,pgmname,xid,NULL);
            perror("execlp");
            exit(7);
        }

        // simulate what exec would do -- call it direct
        opt_W = i;
        worker();
    }

    pid_t pid = fork();

    if (pid < 0) {
        perror("fork2");
        exit(5);
    }

    // receiving done here
    if (pid == 0) {
        for (i = 0; i < WORKNR; i++) {

            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            printf("Farmer received worker response: %p with length %ld value %d\n",
                &resp,received,resp.b);
            // HERE &resp is always the same
        }

        // end worker process
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);
        }

        // exit the farmer's receiver
        printf("farmer: receiver exiting ...\n");
        exit(0);
    }

    // sending done here
    else {
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = i;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
        }

        // wait for farmer's receiver to complete
        printf("farmer: waiting for receiver to finish ...\n");
        waitpid(pid,NULL,0);
    }

    mq_close(respQueue);

    // wait for all workers to complete
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        printf("farmer: waiting for worker to finish ...\n");
        waitpid(wk->wk_pid,NULL,0);
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'W':
            opt_W = atoi(cp + 2);
            break;
        case 'x':
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker();
    else
        farmer();

    return 0;
}

更新#2:

这是一个演示单个请求队列与多个请求队列的版本。工作人员现在检查他们收到的消息中的目标 ID 是否与他们的工作人员编号相匹配。

如果你只是 运行 它没有任何选项,你将得到多个队列和 "good" 输出。

如果你 运行 它与 -b [和可选的 -s] 你会得到一个单一的请求队列,程序会看到错误路由的消息(例如 worker 0 抓取一条消息适用于工人 1)。

单个队列是一个子集。只要工人是"equal",就可以了。但是,如果他们不是(例如,一个工人可以做其他人不能做的事情),那么能够排队到正确的工人就很重要了。一个例子是网络节点具有其他节点没有的特殊 FPGA 辅助计算硬件,并且一些请求需要这种加速。

此外,单个队列由 workers 自平衡。即one调度形式,但还有其他模式。 (例如,农民希望保留对劳动力分配的控制权)。或者,农民必须停止一名工人并让其他工人继续工作(例如,停止的系统将断电进行维护)。

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef unsigned int u32;

typedef struct {
    u32 seqno;                          // sequence number
    int toval;                          // destination id
    int fmval;                          // responder worker id
} request_t;

char *pgmname;
int opt_b;                              // 1=broadcast
int opt_i;                              // 1=ignore errors
int opt_x;                              // 1=do execlp
int opt_s;                              // number of ms to sleep
int opt_S;                              // sequence maximum
int opt_W;                              // worker xid

#define WORKNR      3
#define MAXMSG      10

char mqfile_to_farmer[80];
mqd_t respQueue;

char mqfile_to_worker[80];
mqd_t reqQueue;

struct mq_attr attr;

pid_t ppid;
pid_t curpid;
pid_t pidrcvr;

// per-worker control
typedef struct {
    int wk_xid;
    pid_t wk_pid;
    mqd_t wk_req;
    u32 wk_seqno;
    char wk_mqfile[80];
} worker_t;
worker_t worklist[WORKNR];

#define FORALL_WK \
    wk = &worklist[0];  wk < &worklist[WORKNR];  ++wk

#define sysfault(_fmt...) \
    do { \
        printf(_fmt); \
        if (ppid) \
            kill(ppid,SIGUSR1); \
        exit(1); \
    } while (0)

void
_sysfault(void)
{

    __asm__ __volatile__("" :::);
}

#define logprt(_fmt...) \
    do { \
        int sverr = errno; \
        _logprt(); \
        printf(_fmt); \
        errno = sverr; \
    } while (0)

int logxid;
double logzero;

void
loginit(int xid)
{

    logxid = xid;
}

void
_logprt(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    if (logzero == 0)
        logzero = sec;

    sec -= logzero;

    switch (logxid) {
    case WORKNR:
        printf("%.9f LOG F: ",sec);
        break;
    case WORKNR + 1:
        printf("%.9f LOG R: ",sec);
        break;
    default:
        printf("%.9f LOG W%d: ",sec,logxid);
        break;
    }
}

void
logexit(int code)
{

    exit(code);
}

void
allwait(void)
{
    worker_t *wk;

    // wait for farmer's receiver to complete
    if (pidrcvr) {
        logprt("farmer: waiting for receiver to finish ...\n");
        waitpid(pidrcvr,NULL,0);
        pidrcvr = 0;
    }

    for (FORALL_WK) {
        if (wk->wk_pid) {
            logprt("farmer: waiting for worker %d to finish ...\n",wk->wk_xid);
            waitpid(wk->wk_pid,NULL,0);
            wk->wk_pid = 0;
        }

        if (opt_b)
            continue;

        logprt("farmer: closing and removing worker queue ...\n");
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }
}

void
sighdr(int signo)
{
    worker_t *wk;

    switch (signo) {
    case SIGUSR1:  // request to master
        logprt("sighdr: got master stop signal ...\n");

        if (pidrcvr)
            kill(pidrcvr,SIGUSR2);

        for (FORALL_WK) {
            if (wk->wk_pid)
                kill(wk->wk_pid,SIGUSR2);
        }

        allwait();
        logprt("farmer: abnormal termination\n");

        logexit(1);
        break;

    case SIGUSR2:  // request to slaves
        logexit(1);
        break;
    }
}

void
reqopen(mqd_t *fdp,const char *file,int flag)
{
    mqd_t fd;
    int err;

    attr.mq_maxmsg = MAXMSG;
    attr.mq_msgsize = sizeof(request_t);

    fd = *fdp;
    if (fd >= 0)
        mq_close(fd);

    fd = mq_open(file,flag | O_CREAT,0600,&attr);
    if (fd < 0)
        sysfault("reqopen: %s open fault -- %s\n",file,strerror(errno));

    err = mq_getattr(fd,&attr);
    if (err < 0)
        sysfault("reqopen: %s getattr fault -- %s\n",file,strerror(errno));

    if (attr.mq_msgsize != sizeof(request_t))
        sysfault("reqopen: %s size fault -- mq_msgsize=%ld siz=%ld\n",
            file,attr.mq_msgsize,sizeof(request_t));

    logprt("reqopen: open -- file='%s' fd=%d\n",file,fd);

    *fdp = fd;
}

void worker(int execflg);

void
farmer(void)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    worker_t *wk;
    u32 seqno;
    int xid;

    ppid = getpid();
    curpid = ppid;
    loginit(WORKNR);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d",ppid);

    respQueue = -1;
    reqopen(&respQueue,mqfile_to_farmer,O_RDONLY | O_CREAT | O_EXCL);

    reqQueue = -1;
    if (opt_b)
        reqopen(&reqQueue,mqfile_to_worker,O_WRONLY | O_CREAT | O_EXCL);

    // create the separate request queues
    xid = 0;
    for (FORALL_WK) {
        wk->wk_xid = xid++;

        if (opt_b) {
            logprt("farmer: common request queue -- reqQueue=%d\n",reqQueue);
            wk->wk_req = reqQueue;
            continue;
        }

        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,wk->wk_xid);

        wk->wk_req = -1;
        reqopen(&wk->wk_req,wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL);
        logprt("farmer: separate request queue -- wk_req=%d\n",wk->wk_req);
    }

    // fork the workers
    for (FORALL_WK) {
        pid_t pid = fork();

        if (pid < 0)
            sysfault("farmer: fork fault -- %s\n",strerror(errno));

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char opt[2][20];

            sprintf(opt[0],"-b%d",opt_b);
            sprintf(opt[1],"-W%d",wk->wk_xid);

            execlp(pgmname,pgmname,opt[0],opt[1],NULL);
            sysfault("farmer: execlp error -- %s\n",strerror(errno));
        }

        // simulate what exec would do -- call it direct
        opt_W = wk->wk_xid;
        worker(0);
    }

    pidrcvr = fork();
    if (pidrcvr < 0)
        sysfault("farmer: fork2 error -- %s\n",strerror(errno));

    // receiving done here
    if (pidrcvr == 0) {
        curpid = getpid();
        loginit(WORKNR + 1);

        for (int i = 0; i < (WORKNR * opt_S); i++) {
            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            wk = &worklist[resp.fmval];
            logprt("received worker response: length %d fmval=%d seqno=%u wk_seqno=%u\n",
                (int) received,resp.fmval,resp.seqno,wk->wk_seqno);

            if (received < 0) {
                if (! opt_i)
                    sysfault("farmer: received fault -- %s\n",strerror(errno));
            }

            if (resp.seqno != wk->wk_seqno) {
                logprt("sequence fault\n");
                if (! opt_i)
                    sysfault("farmer: sequence fault\n");
            }

            ++wk->wk_seqno;
        }

        // send stop to worker processes
        for (FORALL_WK) {
            req.toval = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            logprt("Farmer sent stop -- wk_xid=%d sent=%d\n",
                wk->wk_xid,(int) sent);

            if (sent < 0) {
                if (! opt_i)
                    sysfault("farmer: send fault on stop -- %s\n",
                        strerror(errno));
            }
        }

        // exit the farmer's receiver
        logprt("farmer: receiver exiting ...\n");
        logexit(0);
    }

    // sending done here
    else {
        for (seqno = 0;  seqno < opt_S;  ++seqno) {
            for (FORALL_WK) {
                wk->wk_seqno = seqno;
                req.seqno = seqno;
                req.toval = wk->wk_xid;

                sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
                logprt("Farmer sent to wk_xid=%d wk_req=%d -- sent=%d\n",
                    wk->wk_xid,wk->wk_req,(int) sent);
                if (sent < 0) {
                    if (! opt_i)
                        sysfault("farmer: send fault -- %s\n",strerror(errno));
                }
            }
        }
    }

    mq_close(respQueue);

    // wait for all workers to complete
    allwait();

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);

    logprt("farmer: complete\n");
    logexit(0);
}

void
worker(int execflg)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    u32 seqno;
    int slpcnt;

    if (execflg)
        ppid = getppid();
    curpid = getpid();

    loginit(opt_W);
    logprt("worker: my index is %d ...\n",opt_W);

    attr.mq_maxmsg = MAXMSG;

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    reqopen(&respQueue,mqfile_to_farmer,O_WRONLY);

    if (opt_b)
        sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
    else
        sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
    reqopen(&reqQueue,mqfile_to_worker,O_RDONLY);

    seqno = 0;

    slpcnt = opt_s;
    slpcnt *= 1000;
    slpcnt *= opt_W;

    while (1) {
        if (slpcnt > 0) {
            logprt("sleep %d\n",slpcnt);
            usleep(slpcnt);
            slpcnt = 0;
        }

        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        logprt("received length %d -- seqno=%u toval=%d\n",
            (int) received,req.seqno,req.toval);

        if (received < 0)
            sysfault("worker: mq_receive fault -- %s\n",strerror(errno));

        // received stop signal
        if (req.toval < 0) {
            logprt("stopping ...\n");
            break;
        }

        if (req.toval != opt_W) {
            logprt("misroute\n");
            if (! opt_i)
                sysfault("worker: misroute fault\n");
        }

        if (req.seqno != seqno) {
            logprt("sequence fault\n");
            if (! opt_i)
                sysfault("worker: sequence fault\n");
        }

        // do something with request data
        resp.seqno = req.seqno;
        resp.toval = req.toval;
        resp.fmval = opt_W;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        logprt("sent response with length %d -- seqno=%u toval=%d\n",
            (int) sent,req.seqno,resp.toval);

        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            sysfault("worker: mq_send fault -- %s\n",strerror(errno));

        ++seqno;
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    logexit(0);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;
    opt_S = 3;

    reqQueue = -1;
    respQueue = -1;

    signal(SIGUSR1,sighdr);
    signal(SIGUSR2,sighdr);

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'b':  // broadcast mode (single request queue)
            cp += 2;
            opt_b = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'i':  // ignore errors
            cp += 2;
            opt_i = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'S':  // sequence maximum
            cp += 2;
            opt_S = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 's':  // sleep mode (milliseconds)
            cp += 2;
            opt_s = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 'W':  // worker number
            cp += 2;
            opt_W = atoi(cp + 2);
            break;

        case 'x':  // use execlp
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker(1);
    else
        farmer();

    return 0;
}