mqueue получает неверные данные

Ниже приведен код для задания по переработке сельхозпродукции. Основное внимание уделяется комментариям: "ЗДЕСЬ $ или всегда одинаковые / разные". Это моя проблема: когда рабочий процесс выполняет свою работу и отправляет данные ответа фермеру, фермер всегда получает одни и те же данные ответа (один и тот же адрес указателя), даже если рабочий каждый раз отправляет разные данные.

Пример: рабочие отправляют данные по адресам: 0x7fff42318a90,0x7ffddba97390,0x7ffc69e8e060 и т.д., и фермер продолжает получать данные только с одного адреса 0x7ffdb1496f30

Я сделал все возможное, чтобы абстрагировать код и задавать как можно больше вопросов. Если я пропустил важную информацию, пожалуйста, дайте мне знать, я новичок в программировании управления процессами, и я мог бы использовать некоторые рекомендации.

ОБНОВЛЕНИЕ: также печать содержимого resp са resp.b где b Целое число возвращает одно и то же значение, даже если это значение отличается в рабочем.

ОБНОВЛЕНИЕ: я попытался написать какой-нибудь исполняемый код только на этот раз, если рабочий не получал.

// как у фермера, так и у рабочего

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>         // for execlp
#include <mqueue.h>         // for mq

typedef struct{

    int a;

} REQUEST;

typedef struct{

    int b;

} RESPONSE;

static char mq_farmer[80];
static char mq_worker[80];

// фермер:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
    sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());

    //define attr
    struct mq_attr attr;

    attr.mq_maxmsg= 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);

    //  * create the child processes (see process_test() and message_queue_test())
    int i;
    for(i = 0; i < 3; i++)
        {
            pid_t processID = fork();
            if(processID < 0)
                {
                    //error
                }

            else if(processID == 0)
                {
                    //some code

                    execlp("./worker","worker", getpid(), i, NULL);
                }
        }

    pid_t pid = fork();


    if(pid < 0)
        {
            //error
        }
    else
        {
            if(pid == 0) //receiving done here
                {
                    for(i = 0; i < 3; i++)
                        {

                            // read the messages from the worker queue
                            mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
                            printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
                            //HERE &resp is always the same


                        }

                    // end worker process
                    req.a = -1;
                    mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                }
            else //sending done here
                {
                    for(i = 0; i < 3; i++)
                        {
                            req.a = i;
                            mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);

                        }
                }


        }

    waitpid(pid, NULL, 0);
    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

// Рабочий:

int main (int argc, char * argv[])
{

    REQUEST req;
    RESPONSE resp;

    int arg1;

    sscanf(argv[1], "%d", &arg1);

    sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
    sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);

    mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);

    mqd_t respQueue = mq_open (mq_worker, O_WRONLY);

    while (true){

        //receiving
        mqd_t received = mq_receive (reqQueue, (char *) &req,
                                     sizeof(req), NULL);

        printf("Worker received %p with value %d\n", &req, req.a);

        //received stop signal
        if(req.a < 0){
            printf("stopping worker\n");
            break;
        }

        //waiting for farmer to fork 
        sleep(3);

        //do something with request data
        resp.b = req.a;

        //send response
        mqd_t sent = mq_send (respQueue, (char *) &resp,

                              sizeof (resp), NULL);

        printf("Worker sent response: %p\n", &resp);
        //HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);


    //clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);


    return 0;
}

1 ответ

Решение

Когда вы звоните mq_receive он помещает данные в буфер, на который указывает второй аргумент, который вы задаете как &resp, Это не меняет сам указатель.

&resp является фиксированным адресом в родительском элементе, если только вы его не измените, что маловероятно из размещенного кода [который не показывает определения resp], так:

printf("Received worker response: %p\n", &resp);

Вы всегда получите одно и то же значение.

Что вы [вероятно] хотите сделать, это напечатать что resp содержит


ОБНОВИТЬ:

Хорошо, было еще несколько ошибок.

Большая ошибка заключается в том, что, хотя у вас может быть одна очередь для сообщений от рабочего к фермеру (то есть очередь ответов), вы не можете использовать одну очередь для запросов к работникам. Каждому из них нужна своя очередь запросов.

В противном случае один работник может принять / монополизировать все запросы, даже те, которые принадлежат другим. Если это произойдет, фермер, скорее всего, увидит сообщения с печатью только от этого работника.

Это то, что вы видите, потому что первый рабочий [вероятно, № 0] имеет mq_receive завершить первым. Это так быстро, что он делает все mq_receive/mq_send прежде чем другие смогут добраться до них.

Затем он увидит сообщение "стоп" и выйдет. Если другим "повезло", первый работник оставил оставшиеся сообщения об остановке в очереди. Но сообщений о запросах нет, поэтому они никогда не отправляют ответ.

Кроме того, очередь ответа была открыта фермером с O_WRONLY вместо O_RDONLY,

Я подготовил две версии вашей программы. Один с аннотациями для ошибок. Другой, который вычищен и работает.


Вот аннотированная версия [прошу прощения за беспричинную очистку стиля]:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;

static char mq_farmer[80];
static char mq_worker[80];

int
main(int argc,char **argv)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    pgmname = argv[0];

    --argc;
    ++argv;

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
    sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());

    // define attr
    // NOTE/BUG: this can have random data in it
    struct mq_attr attr;

    attr.mq_maxmsg = 10;

    // NOTE/BUG: this is _the_ big one -- we're only doing a single request
    // queue -- each worker needs its _own_ request queue -- otherwise, a
    // single worker can _monopolize_ all messages for the other workers
    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // NOTE/BUG: this should be opened for reading
    attr.mq_msgsize = sizeof(RESPONSE);
    mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // NOTE/BUG: we must remember the child pid numbers so we can do waitpid
    // later
    for (i = 0; i < 3; i++) {
        pid_t processID = fork();

        if (processID < 0) {
            // error
        }

        else if (processID == 0) {
            // some code

            // NOTE/BUG: exec* takes strings so this is wrong
            execlp("./worker","worker",getpid(),i,NULL);
        }
    }

    // NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
    // _not_ mqd_t

    pid_t pid = fork();

    if (pid < 0) {
        // error
    }
    else {
        // receiving done here
        if (pid == 0) {
            for (i = 0; i < 3; i++) {

                // read the messages from the worker queue
                ssize_t received = mq_receive(respQueue,(char *) &resp,
                    sizeof(resp),NULL);

                printf("Farmer received worker response: %p with length %ld value %d\n",
                    &resp,received,resp.b);
                // HERE &resp is always the same
            }

            // end worker process
            req.a = -1;
            sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);

            // NOTE/BUG: we need to exit here
        }

        // sending done here
        else {
            for (i = 0; i < 3; i++) {
                req.a = i;
                sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
                printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
            }
        }

    }

    // NOTE/BUG: we're waiting on the double fork farmer, but _not_
    // on the actual worker pids
    waitpid(pid,NULL,0);

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

int
worker_main(int argc,char *argv[])
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    int arg1;

    // NOTE/BUG: use getppid instead
    sscanf(argv[1],"%d",&arg1);
    printf("worker: my index is %d ...\n",arg1);

    sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
    sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);

    mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);

    mqd_t respQueue = mq_open(mq_worker,O_WRONLY);

    while (1) {
        // receiving
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker received %p with length %ld value %d\n",
            &req,received,req.a);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // waiting for farmer to fork
        sleep(3);

        // do something with request data
        resp.b = req.a;

        // send response
        // NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif

        printf("Worker sent response %p with length %ld value %d\n",
            &req,sent,req.a);
        // HERE &resp is always different (doesn't print)
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    // clean up the message queues
    // NOTE/BUG: farmer should do this -- not worker
    mq_unlink(mq_farmer);
    mq_unlink(mq_worker);

    return 0;
}

Вот очищенная и рабочая версия. Обратите внимание, что для простоты / простоты я объединил как фермерские, так и рабочие программы в одну, используя немного хитрости в main:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef struct {
    int a;
} REQUEST;

typedef struct {
    int b;
} RESPONSE;

char *pgmname;
int opt_x;
int opt_W;

#define WORKNR      3

char mqfile_to_farmer[80];
char mqfile_to_worker[80];

struct mq_attr attr;

pid_t ppid;

// per-worker control
struct worker {
    pid_t wk_pid;
    mqd_t wk_req;
    char wk_mqfile[80];
};

struct worker worklist[WORKNR];

void
worker(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;

    ppid = getppid();

    printf("worker: my index is %d ...\n",opt_W);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);

    mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
    mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);

    while (1) {
        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        printf("Worker %d received %p with length %ld value %d -- %s\n",
            opt_W,&req,received,req.a,strerror(errno));
        if (received < 0)
            exit(77);

        // received stop signal
        if (req.a < 0) {
            printf("stopping worker\n");
            break;
        }

        // do something with request data
        resp.b = req.a;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        printf("Worker %d sent response %p with length %ld value %d -- %s\n",
            opt_W,&req,sent,req.a,strerror(errno));
        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            exit(78);
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    exit(0);
}

void
farmer(void)
{

    REQUEST req;
    RESPONSE resp;
    ssize_t sent;
    struct worker *wk;

    ppid = getpid();

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);

    attr.mq_maxmsg = 10;

    attr.mq_msgsize = sizeof(REQUEST);
    mqd_t respQueue = mq_open(mqfile_to_farmer,
        O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
    if (respQueue < 0) {
        printf("farmer: respQueue open fault -- %s\n",strerror(errno));
        exit(1);
    }

    // create the child processes (see process_test() and message_queue_test())
    int i;

    // create the separate request queues
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        attr.mq_msgsize = sizeof(RESPONSE);
        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
        wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
            &attr);
        if (wk->wk_req < 0) {
            printf("farmer: wk_req open fault -- %s\n",strerror(errno));
            exit(1);
        }
    }

    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];

        pid_t pid = fork();

        if (pid < 0) {
            perror("fork");
            exit(9);
        }

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char xid[20];
            sprintf(xid,"-W%d",i);
            execlp(pgmname,pgmname,xid,NULL);
            perror("execlp");
            exit(7);
        }

        // simulate what exec would do -- call it direct
        opt_W = i;
        worker();
    }

    pid_t pid = fork();

    if (pid < 0) {
        perror("fork2");
        exit(5);
    }

    // receiving done here
    if (pid == 0) {
        for (i = 0; i < WORKNR; i++) {

            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            printf("Farmer received worker response: %p with length %ld value %d\n",
                &resp,received,resp.b);
            // HERE &resp is always the same
        }

        // end worker process
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent stop -- sent=%ld\n",sent);
        }

        // exit the farmer's receiver
        printf("farmer: receiver exiting ...\n");
        exit(0);
    }

    // sending done here
    else {
        for (i = 0; i < WORKNR; i++) {
            wk = &worklist[i];
            req.a = i;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
        }

        // wait for farmer's receiver to complete
        printf("farmer: waiting for receiver to finish ...\n");
        waitpid(pid,NULL,0);
    }

    mq_close(respQueue);

    // wait for all workers to complete
    for (i = 0; i < WORKNR; i++) {
        wk = &worklist[i];
        printf("farmer: waiting for worker to finish ...\n");
        waitpid(wk->wk_pid,NULL,0);
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'W':
            opt_W = atoi(cp + 2);
            break;
        case 'x':
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker();
    else
        farmer();

    return 0;
}

ОБНОВЛЕНИЕ № 2:

Вот версия, которая демонстрирует одну или несколько очередей запросов. Рабочие теперь проверяют идентификатор получателя в полученном сообщении, совпадая с номером их работника.

Если вы просто запустите его без параметров, вы получите несколько очередей и "хороший" вывод.

Если вы запустите его с -b [и опционально -s] вы получите единственную очередь запросов, и программа увидит неправильно доставленные сообщения (например, работник 0 получает сообщение, предназначенное для работника 1).

Единая очередь является подмножеством. Пока рабочие "равны", все в порядке. Но, если они этого не делают (например, один работник может делать то, что другие не могут), важно иметь возможность ставить в очередь правильного работника. Примером может служить сетевой узел, который имеет специальное аппаратное обеспечение для расчета с помощью ПЛИС, которого нет у других, и для некоторых запросов требуется такое ускорение.

Кроме того, работники самостоятельно балансируют очередь. Это одна из форм планирования, но есть и другие модели. (например, фермер хочет сохранить контроль над распределением рабочей силы). Или фермер должен остановить одного работника и заставить остальных работать (например, остановленная система будет отключена для технического обслуживания).

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>                     // for execlp
#include <mqueue.h>                     // for mq

typedef unsigned int u32;

typedef struct {
    u32 seqno;                          // sequence number
    int toval;                          // destination id
    int fmval;                          // responder worker id
} request_t;

char *pgmname;
int opt_b;                              // 1=broadcast
int opt_i;                              // 1=ignore errors
int opt_x;                              // 1=do execlp
int opt_s;                              // number of ms to sleep
int opt_S;                              // sequence maximum
int opt_W;                              // worker xid

#define WORKNR      3
#define MAXMSG      10

char mqfile_to_farmer[80];
mqd_t respQueue;

char mqfile_to_worker[80];
mqd_t reqQueue;

struct mq_attr attr;

pid_t ppid;
pid_t curpid;
pid_t pidrcvr;

// per-worker control
typedef struct {
    int wk_xid;
    pid_t wk_pid;
    mqd_t wk_req;
    u32 wk_seqno;
    char wk_mqfile[80];
} worker_t;
worker_t worklist[WORKNR];

#define FORALL_WK \
    wk = &worklist[0];  wk < &worklist[WORKNR];  ++wk

#define sysfault(_fmt...) \
    do { \
        printf(_fmt); \
        if (ppid) \
            kill(ppid,SIGUSR1); \
        exit(1); \
    } while (0)

void
_sysfault(void)
{

    __asm__ __volatile__("" :::);
}

#define logprt(_fmt...) \
    do { \
        int sverr = errno; \
        _logprt(); \
        printf(_fmt); \
        errno = sverr; \
    } while (0)

int logxid;
double logzero;

void
loginit(int xid)
{

    logxid = xid;
}

void
_logprt(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    if (logzero == 0)
        logzero = sec;

    sec -= logzero;

    switch (logxid) {
    case WORKNR:
        printf("%.9f LOG F: ",sec);
        break;
    case WORKNR + 1:
        printf("%.9f LOG R: ",sec);
        break;
    default:
        printf("%.9f LOG W%d: ",sec,logxid);
        break;
    }
}

void
logexit(int code)
{

    exit(code);
}

void
allwait(void)
{
    worker_t *wk;

    // wait for farmer's receiver to complete
    if (pidrcvr) {
        logprt("farmer: waiting for receiver to finish ...\n");
        waitpid(pidrcvr,NULL,0);
        pidrcvr = 0;
    }

    for (FORALL_WK) {
        if (wk->wk_pid) {
            logprt("farmer: waiting for worker %d to finish ...\n",wk->wk_xid);
            waitpid(wk->wk_pid,NULL,0);
            wk->wk_pid = 0;
        }

        if (opt_b)
            continue;

        logprt("farmer: closing and removing worker queue ...\n");
        mq_close(wk->wk_req);
        mq_unlink(wk->wk_mqfile);
    }
}

void
sighdr(int signo)
{
    worker_t *wk;

    switch (signo) {
    case SIGUSR1:  // request to master
        logprt("sighdr: got master stop signal ...\n");

        if (pidrcvr)
            kill(pidrcvr,SIGUSR2);

        for (FORALL_WK) {
            if (wk->wk_pid)
                kill(wk->wk_pid,SIGUSR2);
        }

        allwait();
        logprt("farmer: abnormal termination\n");

        logexit(1);
        break;

    case SIGUSR2:  // request to slaves
        logexit(1);
        break;
    }
}

void
reqopen(mqd_t *fdp,const char *file,int flag)
{
    mqd_t fd;
    int err;

    attr.mq_maxmsg = MAXMSG;
    attr.mq_msgsize = sizeof(request_t);

    fd = *fdp;
    if (fd >= 0)
        mq_close(fd);

    fd = mq_open(file,flag | O_CREAT,0600,&attr);
    if (fd < 0)
        sysfault("reqopen: %s open fault -- %s\n",file,strerror(errno));

    err = mq_getattr(fd,&attr);
    if (err < 0)
        sysfault("reqopen: %s getattr fault -- %s\n",file,strerror(errno));

    if (attr.mq_msgsize != sizeof(request_t))
        sysfault("reqopen: %s size fault -- mq_msgsize=%ld siz=%ld\n",
            file,attr.mq_msgsize,sizeof(request_t));

    logprt("reqopen: open -- file='%s' fd=%d\n",file,fd);

    *fdp = fd;
}

void worker(int execflg);

void
farmer(void)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    worker_t *wk;
    u32 seqno;
    int xid;

    ppid = getpid();
    curpid = ppid;
    loginit(WORKNR);

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    sprintf(mqfile_to_worker,"/mq_request_%d",ppid);

    respQueue = -1;
    reqopen(&respQueue,mqfile_to_farmer,O_RDONLY | O_CREAT | O_EXCL);

    reqQueue = -1;
    if (opt_b)
        reqopen(&reqQueue,mqfile_to_worker,O_WRONLY | O_CREAT | O_EXCL);

    // create the separate request queues
    xid = 0;
    for (FORALL_WK) {
        wk->wk_xid = xid++;

        if (opt_b) {
            logprt("farmer: common request queue -- reqQueue=%d\n",reqQueue);
            wk->wk_req = reqQueue;
            continue;
        }

        sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,wk->wk_xid);

        wk->wk_req = -1;
        reqopen(&wk->wk_req,wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL);
        logprt("farmer: separate request queue -- wk_req=%d\n",wk->wk_req);
    }

    // fork the workers
    for (FORALL_WK) {
        pid_t pid = fork();

        if (pid < 0)
            sysfault("farmer: fork fault -- %s\n",strerror(errno));

        if (pid != 0) {
            wk->wk_pid = pid;
            continue;
        }

        // NOTE/FIX: exec* takes strings so this is the correct way
        if (opt_x) {
            char opt[2][20];

            sprintf(opt[0],"-b%d",opt_b);
            sprintf(opt[1],"-W%d",wk->wk_xid);

            execlp(pgmname,pgmname,opt[0],opt[1],NULL);
            sysfault("farmer: execlp error -- %s\n",strerror(errno));
        }

        // simulate what exec would do -- call it direct
        opt_W = wk->wk_xid;
        worker(0);
    }

    pidrcvr = fork();
    if (pidrcvr < 0)
        sysfault("farmer: fork2 error -- %s\n",strerror(errno));

    // receiving done here
    if (pidrcvr == 0) {
        curpid = getpid();
        loginit(WORKNR + 1);

        for (int i = 0; i < (WORKNR * opt_S); i++) {
            // read the messages from the worker queue
            ssize_t received = mq_receive(respQueue,(char *) &resp,
                sizeof(resp),NULL);

            wk = &worklist[resp.fmval];
            logprt("received worker response: length %d fmval=%d seqno=%u wk_seqno=%u\n",
                (int) received,resp.fmval,resp.seqno,wk->wk_seqno);

            if (received < 0) {
                if (! opt_i)
                    sysfault("farmer: received fault -- %s\n",strerror(errno));
            }

            if (resp.seqno != wk->wk_seqno) {
                logprt("sequence fault\n");
                if (! opt_i)
                    sysfault("farmer: sequence fault\n");
            }

            ++wk->wk_seqno;
        }

        // send stop to worker processes
        for (FORALL_WK) {
            req.toval = -1;
            sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
            logprt("Farmer sent stop -- wk_xid=%d sent=%d\n",
                wk->wk_xid,(int) sent);

            if (sent < 0) {
                if (! opt_i)
                    sysfault("farmer: send fault on stop -- %s\n",
                        strerror(errno));
            }
        }

        // exit the farmer's receiver
        logprt("farmer: receiver exiting ...\n");
        logexit(0);
    }

    // sending done here
    else {
        for (seqno = 0;  seqno < opt_S;  ++seqno) {
            for (FORALL_WK) {
                wk->wk_seqno = seqno;
                req.seqno = seqno;
                req.toval = wk->wk_xid;

                sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
                logprt("Farmer sent to wk_xid=%d wk_req=%d -- sent=%d\n",
                    wk->wk_xid,wk->wk_req,(int) sent);
                if (sent < 0) {
                    if (! opt_i)
                        sysfault("farmer: send fault -- %s\n",strerror(errno));
                }
            }
        }
    }

    mq_close(respQueue);

    // wait for all workers to complete
    allwait();

    // clean up the message queues
    mq_unlink(mqfile_to_farmer);

    logprt("farmer: complete\n");
    logexit(0);
}

void
worker(int execflg)
{
    request_t req;
    request_t resp;
    ssize_t sent;
    u32 seqno;
    int slpcnt;

    if (execflg)
        ppid = getppid();
    curpid = getpid();

    loginit(opt_W);
    logprt("worker: my index is %d ...\n",opt_W);

    attr.mq_maxmsg = MAXMSG;

    sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
    reqopen(&respQueue,mqfile_to_farmer,O_WRONLY);

    if (opt_b)
        sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
    else
        sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
    reqopen(&reqQueue,mqfile_to_worker,O_RDONLY);

    seqno = 0;

    slpcnt = opt_s;
    slpcnt *= 1000;
    slpcnt *= opt_W;

    while (1) {
        if (slpcnt > 0) {
            logprt("sleep %d\n",slpcnt);
            usleep(slpcnt);
            slpcnt = 0;
        }

        // receiving
        errno = 0;
        ssize_t received = mq_receive(reqQueue,(char *) &req,
            sizeof(req),NULL);

        logprt("received length %d -- seqno=%u toval=%d\n",
            (int) received,req.seqno,req.toval);

        if (received < 0)
            sysfault("worker: mq_receive fault -- %s\n",strerror(errno));

        // received stop signal
        if (req.toval < 0) {
            logprt("stopping ...\n");
            break;
        }

        if (req.toval != opt_W) {
            logprt("misroute\n");
            if (! opt_i)
                sysfault("worker: misroute fault\n");
        }

        if (req.seqno != seqno) {
            logprt("sequence fault\n");
            if (! opt_i)
                sysfault("worker: sequence fault\n");
        }

        // do something with request data
        resp.seqno = req.seqno;
        resp.toval = req.toval;
        resp.fmval = opt_W;

        // send response
        errno = 0;
        sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);

        logprt("sent response with length %d -- seqno=%u toval=%d\n",
            (int) sent,req.seqno,resp.toval);

        // HERE &resp is always different (doesn't print)
        if (sent < 0)
            sysfault("worker: mq_send fault -- %s\n",strerror(errno));

        ++seqno;
    }

    mq_close(reqQueue);
    mq_close(respQueue);

    logexit(0);
}

int
main(int argc,char **argv)
{
    char *cp;

    pgmname = argv[0];

    --argc;
    ++argv;

    opt_W = -1;
    opt_S = 3;

    reqQueue = -1;
    respQueue = -1;

    signal(SIGUSR1,sighdr);
    signal(SIGUSR2,sighdr);

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'b':  // broadcast mode (single request queue)
            cp += 2;
            opt_b = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'i':  // ignore errors
            cp += 2;
            opt_i = (*cp != 0) ? atoi(cp) : 1;
            break;

        case 'S':  // sequence maximum
            cp += 2;
            opt_S = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 's':  // sleep mode (milliseconds)
            cp += 2;
            opt_s = (*cp != 0) ? atoi(cp) : 3;
            break;

        case 'W':  // worker number
            cp += 2;
            opt_W = atoi(cp + 2);
            break;

        case 'x':  // use execlp
            opt_x = ! opt_x;
            break;
        }
    }

    if (opt_W >= 0)
        worker(1);
    else
        farmer();

    return 0;
}
Другие вопросы по тегам