mqueue получает неверные данные
Ниже приведен код для задания по переработке сельхозпродукции. Основное внимание уделяется комментариям: "ЗДЕСЬ $ или всегда одинаковые / разные". Это моя проблема: когда рабочий процесс выполняет свою работу и отправляет данные ответа фермеру, фермер всегда получает одни и те же данные ответа (один и тот же адрес указателя), даже если рабочий каждый раз отправляет разные данные.
Пример: рабочие отправляют данные по адресам: 0x7fff42318a90
,0x7ffddba97390
,0x7ffc69e8e060
и т.д., и фермер продолжает получать данные только с одного адреса 0x7ffdb1496f30
Я сделал все возможное, чтобы абстрагировать код и задавать как можно больше вопросов. Если я пропустил важную информацию, пожалуйста, дайте мне знать, я новичок в программировании управления процессами, и я мог бы использовать некоторые рекомендации.
ОБНОВЛЕНИЕ: также печать содержимого resp
са resp.b
где b
Целое число возвращает одно и то же значение, даже если это значение отличается в рабочем.
ОБНОВЛЕНИЕ: я попытался написать какой-нибудь исполняемый код только на этот раз, если рабочий не получал.
// как у фермера, так и у рабочего
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq
typedef struct{
int a;
} REQUEST;
typedef struct{
int b;
} RESPONSE;
static char mq_farmer[80];
static char mq_worker[80];
// фермер:
int main (int argc, char * argv[])
{
REQUEST req;
RESPONSE resp;
sprintf (mq_farmer, "/mq_request_%s_%d", "foo", getpid());
sprintf (mq_worker, "/mq_response_%s_%d", "bar", getpid());
//define attr
struct mq_attr attr;
attr.mq_maxmsg= 10;
attr.mq_msgsize = sizeof(REQUEST);
mqd_t reqQueue = mq_open(mq_farmer, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);
attr.mq_msgsize = sizeof(RESPONSE);
mqd_t respQueue = mq_open(mq_worker, O_WRONLY | O_CREAT | O_EXCL, 0600, &attr);
// * create the child processes (see process_test() and message_queue_test())
int i;
for(i = 0; i < 3; i++)
{
pid_t processID = fork();
if(processID < 0)
{
//error
}
else if(processID == 0)
{
//some code
execlp("./worker","worker", getpid(), i, NULL);
}
}
pid_t pid = fork();
if(pid < 0)
{
//error
}
else
{
if(pid == 0) //receiving done here
{
for(i = 0; i < 3; i++)
{
// read the messages from the worker queue
mqd_t received = mq_receive (respQueue, (char *) &resp, sizeof(resp), NULL);
printf("Farmer received worker response: %p\n with value %d\n", &resp, resp.b);
//HERE &resp is always the same
}
// end worker process
req.a = -1;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);
}
else //sending done here
{
for(i = 0; i < 3; i++)
{
req.a = i;
mqd_t sent = mq_send(reqQueue, (char *) &req,sizeof(req), 0);
}
}
}
waitpid(pid, NULL, 0);
mq_close(reqQueue);
mq_close(respQueue);
//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);
return 0;
}
// Рабочий:
int main (int argc, char * argv[])
{
REQUEST req;
RESPONSE resp;
int arg1;
sscanf(argv[1], "%d", &arg1);
sprintf (mq_farmer, "/mq_request_%s_%d", "foo", arg1);
sprintf (mq_worker, "/mq_response_%s_%d", "bar",arg1);
mqd_t reqQueue = mq_open (mq_farmer, O_RDONLY);
mqd_t respQueue = mq_open (mq_worker, O_WRONLY);
while (true){
//receiving
mqd_t received = mq_receive (reqQueue, (char *) &req,
sizeof(req), NULL);
printf("Worker received %p with value %d\n", &req, req.a);
//received stop signal
if(req.a < 0){
printf("stopping worker\n");
break;
}
//waiting for farmer to fork
sleep(3);
//do something with request data
resp.b = req.a;
//send response
mqd_t sent = mq_send (respQueue, (char *) &resp,
sizeof (resp), NULL);
printf("Worker sent response: %p\n", &resp);
//HERE &resp is always different (doesn't print)
}
mq_close(reqQueue);
mq_close(respQueue);
//clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);
return 0;
}
1 ответ
Когда вы звоните mq_receive
он помещает данные в буфер, на который указывает второй аргумент, который вы задаете как &resp
, Это не меняет сам указатель.
&resp
является фиксированным адресом в родительском элементе, если только вы его не измените, что маловероятно из размещенного кода [который не показывает определения resp
], так:
printf("Received worker response: %p\n", &resp);
Вы всегда получите одно и то же значение.
Что вы [вероятно] хотите сделать, это напечатать что resp
содержит
ОБНОВИТЬ:
Хорошо, было еще несколько ошибок.
Большая ошибка заключается в том, что, хотя у вас может быть одна очередь для сообщений от рабочего к фермеру (то есть очередь ответов), вы не можете использовать одну очередь для запросов к работникам. Каждому из них нужна своя очередь запросов.
В противном случае один работник может принять / монополизировать все запросы, даже те, которые принадлежат другим. Если это произойдет, фермер, скорее всего, увидит сообщения с печатью только от этого работника.
Это то, что вы видите, потому что первый рабочий [вероятно, № 0] имеет mq_receive
завершить первым. Это так быстро, что он делает все mq_receive/mq_send
прежде чем другие смогут добраться до них.
Затем он увидит сообщение "стоп" и выйдет. Если другим "повезло", первый работник оставил оставшиеся сообщения об остановке в очереди. Но сообщений о запросах нет, поэтому они никогда не отправляют ответ.
Кроме того, очередь ответа была открыта фермером с O_WRONLY
вместо O_RDONLY
,
Я подготовил две версии вашей программы. Один с аннотациями для ошибок. Другой, который вычищен и работает.
Вот аннотированная версия [прошу прощения за беспричинную очистку стиля]:
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq
typedef struct {
int a;
} REQUEST;
typedef struct {
int b;
} RESPONSE;
char *pgmname;
static char mq_farmer[80];
static char mq_worker[80];
int
main(int argc,char **argv)
{
REQUEST req;
RESPONSE resp;
ssize_t sent;
pgmname = argv[0];
--argc;
++argv;
sprintf(mq_farmer,"/mq_request_%s_%d","foo",getpid());
sprintf(mq_worker,"/mq_response_%s_%d","bar",getpid());
// define attr
// NOTE/BUG: this can have random data in it
struct mq_attr attr;
attr.mq_maxmsg = 10;
// NOTE/BUG: this is _the_ big one -- we're only doing a single request
// queue -- each worker needs its _own_ request queue -- otherwise, a
// single worker can _monopolize_ all messages for the other workers
attr.mq_msgsize = sizeof(REQUEST);
mqd_t reqQueue = mq_open(mq_farmer,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);
// NOTE/BUG: this should be opened for reading
attr.mq_msgsize = sizeof(RESPONSE);
mqd_t respQueue = mq_open(mq_worker,O_WRONLY | O_CREAT | O_EXCL,0600,&attr);
// create the child processes (see process_test() and message_queue_test())
int i;
// NOTE/BUG: we must remember the child pid numbers so we can do waitpid
// later
for (i = 0; i < 3; i++) {
pid_t processID = fork();
if (processID < 0) {
// error
}
else if (processID == 0) {
// some code
// NOTE/BUG: exec* takes strings so this is wrong
execlp("./worker","worker",getpid(),i,NULL);
}
}
// NOTE/BUG: on all mq_send/mq_receive, the return type is ssize_t and
// _not_ mqd_t
pid_t pid = fork();
if (pid < 0) {
// error
}
else {
// receiving done here
if (pid == 0) {
for (i = 0; i < 3; i++) {
// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);
printf("Farmer received worker response: %p with length %ld value %d\n",
&resp,received,resp.b);
// HERE &resp is always the same
}
// end worker process
req.a = -1;
sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
printf("Farmer sent stop -- sent=%ld\n",sent);
// NOTE/BUG: we need to exit here
}
// sending done here
else {
for (i = 0; i < 3; i++) {
req.a = i;
sent = mq_send(reqQueue,(char *) &req,sizeof(req),0);
printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
}
}
}
// NOTE/BUG: we're waiting on the double fork farmer, but _not_
// on the actual worker pids
waitpid(pid,NULL,0);
mq_close(reqQueue);
mq_close(respQueue);
// clean up the message queues
mq_unlink(mq_farmer);
mq_unlink(mq_worker);
return 0;
}
int
worker_main(int argc,char *argv[])
{
REQUEST req;
RESPONSE resp;
ssize_t sent;
int arg1;
// NOTE/BUG: use getppid instead
sscanf(argv[1],"%d",&arg1);
printf("worker: my index is %d ...\n",arg1);
sprintf(mq_farmer,"/mq_request_%s_%d","foo",arg1);
sprintf(mq_worker,"/mq_response_%s_%d","bar",arg1);
mqd_t reqQueue = mq_open(mq_farmer,O_RDONLY);
mqd_t respQueue = mq_open(mq_worker,O_WRONLY);
while (1) {
// receiving
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);
printf("Worker received %p with length %ld value %d\n",
&req,received,req.a);
// received stop signal
if (req.a < 0) {
printf("stopping worker\n");
break;
}
// waiting for farmer to fork
sleep(3);
// do something with request data
resp.b = req.a;
// send response
// NOTE/BUG: last argument is unsigned int and _not_ pointer
#if 0
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),NULL);
#else
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
#endif
printf("Worker sent response %p with length %ld value %d\n",
&req,sent,req.a);
// HERE &resp is always different (doesn't print)
}
mq_close(reqQueue);
mq_close(respQueue);
// clean up the message queues
// NOTE/BUG: farmer should do this -- not worker
mq_unlink(mq_farmer);
mq_unlink(mq_worker);
return 0;
}
Вот очищенная и рабочая версия. Обратите внимание, что для простоты / простоты я объединил как фермерские, так и рабочие программы в одну, используя немного хитрости в main
:
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq
typedef struct {
int a;
} REQUEST;
typedef struct {
int b;
} RESPONSE;
char *pgmname;
int opt_x;
int opt_W;
#define WORKNR 3
char mqfile_to_farmer[80];
char mqfile_to_worker[80];
struct mq_attr attr;
pid_t ppid;
// per-worker control
struct worker {
pid_t wk_pid;
mqd_t wk_req;
char wk_mqfile[80];
};
struct worker worklist[WORKNR];
void
worker(void)
{
REQUEST req;
RESPONSE resp;
ssize_t sent;
ppid = getppid();
printf("worker: my index is %d ...\n",opt_W);
sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
mqd_t reqQueue = mq_open(mqfile_to_worker,O_RDONLY);
mqd_t respQueue = mq_open(mqfile_to_farmer,O_WRONLY);
while (1) {
// receiving
errno = 0;
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);
printf("Worker %d received %p with length %ld value %d -- %s\n",
opt_W,&req,received,req.a,strerror(errno));
if (received < 0)
exit(77);
// received stop signal
if (req.a < 0) {
printf("stopping worker\n");
break;
}
// do something with request data
resp.b = req.a;
// send response
errno = 0;
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
printf("Worker %d sent response %p with length %ld value %d -- %s\n",
opt_W,&req,sent,req.a,strerror(errno));
// HERE &resp is always different (doesn't print)
if (sent < 0)
exit(78);
}
mq_close(reqQueue);
mq_close(respQueue);
exit(0);
}
void
farmer(void)
{
REQUEST req;
RESPONSE resp;
ssize_t sent;
struct worker *wk;
ppid = getpid();
sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
attr.mq_maxmsg = 10;
attr.mq_msgsize = sizeof(REQUEST);
mqd_t respQueue = mq_open(mqfile_to_farmer,
O_RDONLY | O_CREAT | O_EXCL,0600,&attr);
if (respQueue < 0) {
printf("farmer: respQueue open fault -- %s\n",strerror(errno));
exit(1);
}
// create the child processes (see process_test() and message_queue_test())
int i;
// create the separate request queues
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
attr.mq_msgsize = sizeof(RESPONSE);
sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,i);
wk->wk_req = mq_open(wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL,0600,
&attr);
if (wk->wk_req < 0) {
printf("farmer: wk_req open fault -- %s\n",strerror(errno));
exit(1);
}
}
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
pid_t pid = fork();
if (pid < 0) {
perror("fork");
exit(9);
}
if (pid != 0) {
wk->wk_pid = pid;
continue;
}
// NOTE/FIX: exec* takes strings so this is the correct way
if (opt_x) {
char xid[20];
sprintf(xid,"-W%d",i);
execlp(pgmname,pgmname,xid,NULL);
perror("execlp");
exit(7);
}
// simulate what exec would do -- call it direct
opt_W = i;
worker();
}
pid_t pid = fork();
if (pid < 0) {
perror("fork2");
exit(5);
}
// receiving done here
if (pid == 0) {
for (i = 0; i < WORKNR; i++) {
// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);
printf("Farmer received worker response: %p with length %ld value %d\n",
&resp,received,resp.b);
// HERE &resp is always the same
}
// end worker process
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
req.a = -1;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
printf("Farmer sent stop -- sent=%ld\n",sent);
}
// exit the farmer's receiver
printf("farmer: receiver exiting ...\n");
exit(0);
}
// sending done here
else {
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
req.a = i;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
printf("Farmer sent to i=%d -- sent=%ld\n",i,sent);
}
// wait for farmer's receiver to complete
printf("farmer: waiting for receiver to finish ...\n");
waitpid(pid,NULL,0);
}
mq_close(respQueue);
// wait for all workers to complete
for (i = 0; i < WORKNR; i++) {
wk = &worklist[i];
printf("farmer: waiting for worker to finish ...\n");
waitpid(wk->wk_pid,NULL,0);
mq_close(wk->wk_req);
mq_unlink(wk->wk_mqfile);
}
// clean up the message queues
mq_unlink(mqfile_to_farmer);
}
int
main(int argc,char **argv)
{
char *cp;
pgmname = argv[0];
--argc;
++argv;
opt_W = -1;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'W':
opt_W = atoi(cp + 2);
break;
case 'x':
opt_x = ! opt_x;
break;
}
}
if (opt_W >= 0)
worker();
else
farmer();
return 0;
}
ОБНОВЛЕНИЕ № 2:
Вот версия, которая демонстрирует одну или несколько очередей запросов. Рабочие теперь проверяют идентификатор получателя в полученном сообщении, совпадая с номером их работника.
Если вы просто запустите его без параметров, вы получите несколько очередей и "хороший" вывод.
Если вы запустите его с -b
[и опционально -s
] вы получите единственную очередь запросов, и программа увидит неправильно доставленные сообщения (например, работник 0 получает сообщение, предназначенное для работника 1).
Единая очередь является подмножеством. Пока рабочие "равны", все в порядке. Но, если они этого не делают (например, один работник может делать то, что другие не могут), важно иметь возможность ставить в очередь правильного работника. Примером может служить сетевой узел, который имеет специальное аппаратное обеспечение для расчета с помощью ПЛИС, которого нет у других, и для некоторых запросов требуется такое ускорение.
Кроме того, работники самостоятельно балансируют очередь. Это одна из форм планирования, но есть и другие модели. (например, фермер хочет сохранить контроль над распределением рабочей силы). Или фермер должен остановить одного работника и заставить остальных работать (например, остановленная система будет отключена для технического обслуживания).
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h> // for execlp
#include <mqueue.h> // for mq
typedef unsigned int u32;
typedef struct {
u32 seqno; // sequence number
int toval; // destination id
int fmval; // responder worker id
} request_t;
char *pgmname;
int opt_b; // 1=broadcast
int opt_i; // 1=ignore errors
int opt_x; // 1=do execlp
int opt_s; // number of ms to sleep
int opt_S; // sequence maximum
int opt_W; // worker xid
#define WORKNR 3
#define MAXMSG 10
char mqfile_to_farmer[80];
mqd_t respQueue;
char mqfile_to_worker[80];
mqd_t reqQueue;
struct mq_attr attr;
pid_t ppid;
pid_t curpid;
pid_t pidrcvr;
// per-worker control
typedef struct {
int wk_xid;
pid_t wk_pid;
mqd_t wk_req;
u32 wk_seqno;
char wk_mqfile[80];
} worker_t;
worker_t worklist[WORKNR];
#define FORALL_WK \
wk = &worklist[0]; wk < &worklist[WORKNR]; ++wk
#define sysfault(_fmt...) \
do { \
printf(_fmt); \
if (ppid) \
kill(ppid,SIGUSR1); \
exit(1); \
} while (0)
void
_sysfault(void)
{
__asm__ __volatile__("" :::);
}
#define logprt(_fmt...) \
do { \
int sverr = errno; \
_logprt(); \
printf(_fmt); \
errno = sverr; \
} while (0)
int logxid;
double logzero;
void
loginit(int xid)
{
logxid = xid;
}
void
_logprt(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_REALTIME,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
if (logzero == 0)
logzero = sec;
sec -= logzero;
switch (logxid) {
case WORKNR:
printf("%.9f LOG F: ",sec);
break;
case WORKNR + 1:
printf("%.9f LOG R: ",sec);
break;
default:
printf("%.9f LOG W%d: ",sec,logxid);
break;
}
}
void
logexit(int code)
{
exit(code);
}
void
allwait(void)
{
worker_t *wk;
// wait for farmer's receiver to complete
if (pidrcvr) {
logprt("farmer: waiting for receiver to finish ...\n");
waitpid(pidrcvr,NULL,0);
pidrcvr = 0;
}
for (FORALL_WK) {
if (wk->wk_pid) {
logprt("farmer: waiting for worker %d to finish ...\n",wk->wk_xid);
waitpid(wk->wk_pid,NULL,0);
wk->wk_pid = 0;
}
if (opt_b)
continue;
logprt("farmer: closing and removing worker queue ...\n");
mq_close(wk->wk_req);
mq_unlink(wk->wk_mqfile);
}
}
void
sighdr(int signo)
{
worker_t *wk;
switch (signo) {
case SIGUSR1: // request to master
logprt("sighdr: got master stop signal ...\n");
if (pidrcvr)
kill(pidrcvr,SIGUSR2);
for (FORALL_WK) {
if (wk->wk_pid)
kill(wk->wk_pid,SIGUSR2);
}
allwait();
logprt("farmer: abnormal termination\n");
logexit(1);
break;
case SIGUSR2: // request to slaves
logexit(1);
break;
}
}
void
reqopen(mqd_t *fdp,const char *file,int flag)
{
mqd_t fd;
int err;
attr.mq_maxmsg = MAXMSG;
attr.mq_msgsize = sizeof(request_t);
fd = *fdp;
if (fd >= 0)
mq_close(fd);
fd = mq_open(file,flag | O_CREAT,0600,&attr);
if (fd < 0)
sysfault("reqopen: %s open fault -- %s\n",file,strerror(errno));
err = mq_getattr(fd,&attr);
if (err < 0)
sysfault("reqopen: %s getattr fault -- %s\n",file,strerror(errno));
if (attr.mq_msgsize != sizeof(request_t))
sysfault("reqopen: %s size fault -- mq_msgsize=%ld siz=%ld\n",
file,attr.mq_msgsize,sizeof(request_t));
logprt("reqopen: open -- file='%s' fd=%d\n",file,fd);
*fdp = fd;
}
void worker(int execflg);
void
farmer(void)
{
request_t req;
request_t resp;
ssize_t sent;
worker_t *wk;
u32 seqno;
int xid;
ppid = getpid();
curpid = ppid;
loginit(WORKNR);
sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
respQueue = -1;
reqopen(&respQueue,mqfile_to_farmer,O_RDONLY | O_CREAT | O_EXCL);
reqQueue = -1;
if (opt_b)
reqopen(&reqQueue,mqfile_to_worker,O_WRONLY | O_CREAT | O_EXCL);
// create the separate request queues
xid = 0;
for (FORALL_WK) {
wk->wk_xid = xid++;
if (opt_b) {
logprt("farmer: common request queue -- reqQueue=%d\n",reqQueue);
wk->wk_req = reqQueue;
continue;
}
sprintf(wk->wk_mqfile,"/mq_request_%d_%d",ppid,wk->wk_xid);
wk->wk_req = -1;
reqopen(&wk->wk_req,wk->wk_mqfile,O_WRONLY | O_CREAT | O_EXCL);
logprt("farmer: separate request queue -- wk_req=%d\n",wk->wk_req);
}
// fork the workers
for (FORALL_WK) {
pid_t pid = fork();
if (pid < 0)
sysfault("farmer: fork fault -- %s\n",strerror(errno));
if (pid != 0) {
wk->wk_pid = pid;
continue;
}
// NOTE/FIX: exec* takes strings so this is the correct way
if (opt_x) {
char opt[2][20];
sprintf(opt[0],"-b%d",opt_b);
sprintf(opt[1],"-W%d",wk->wk_xid);
execlp(pgmname,pgmname,opt[0],opt[1],NULL);
sysfault("farmer: execlp error -- %s\n",strerror(errno));
}
// simulate what exec would do -- call it direct
opt_W = wk->wk_xid;
worker(0);
}
pidrcvr = fork();
if (pidrcvr < 0)
sysfault("farmer: fork2 error -- %s\n",strerror(errno));
// receiving done here
if (pidrcvr == 0) {
curpid = getpid();
loginit(WORKNR + 1);
for (int i = 0; i < (WORKNR * opt_S); i++) {
// read the messages from the worker queue
ssize_t received = mq_receive(respQueue,(char *) &resp,
sizeof(resp),NULL);
wk = &worklist[resp.fmval];
logprt("received worker response: length %d fmval=%d seqno=%u wk_seqno=%u\n",
(int) received,resp.fmval,resp.seqno,wk->wk_seqno);
if (received < 0) {
if (! opt_i)
sysfault("farmer: received fault -- %s\n",strerror(errno));
}
if (resp.seqno != wk->wk_seqno) {
logprt("sequence fault\n");
if (! opt_i)
sysfault("farmer: sequence fault\n");
}
++wk->wk_seqno;
}
// send stop to worker processes
for (FORALL_WK) {
req.toval = -1;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
logprt("Farmer sent stop -- wk_xid=%d sent=%d\n",
wk->wk_xid,(int) sent);
if (sent < 0) {
if (! opt_i)
sysfault("farmer: send fault on stop -- %s\n",
strerror(errno));
}
}
// exit the farmer's receiver
logprt("farmer: receiver exiting ...\n");
logexit(0);
}
// sending done here
else {
for (seqno = 0; seqno < opt_S; ++seqno) {
for (FORALL_WK) {
wk->wk_seqno = seqno;
req.seqno = seqno;
req.toval = wk->wk_xid;
sent = mq_send(wk->wk_req,(char *) &req,sizeof(req),0);
logprt("Farmer sent to wk_xid=%d wk_req=%d -- sent=%d\n",
wk->wk_xid,wk->wk_req,(int) sent);
if (sent < 0) {
if (! opt_i)
sysfault("farmer: send fault -- %s\n",strerror(errno));
}
}
}
}
mq_close(respQueue);
// wait for all workers to complete
allwait();
// clean up the message queues
mq_unlink(mqfile_to_farmer);
logprt("farmer: complete\n");
logexit(0);
}
void
worker(int execflg)
{
request_t req;
request_t resp;
ssize_t sent;
u32 seqno;
int slpcnt;
if (execflg)
ppid = getppid();
curpid = getpid();
loginit(opt_W);
logprt("worker: my index is %d ...\n",opt_W);
attr.mq_maxmsg = MAXMSG;
sprintf(mqfile_to_farmer,"/mq_response_%d",ppid);
reqopen(&respQueue,mqfile_to_farmer,O_WRONLY);
if (opt_b)
sprintf(mqfile_to_worker,"/mq_request_%d",ppid);
else
sprintf(mqfile_to_worker,"/mq_request_%d_%d",ppid,opt_W);
reqopen(&reqQueue,mqfile_to_worker,O_RDONLY);
seqno = 0;
slpcnt = opt_s;
slpcnt *= 1000;
slpcnt *= opt_W;
while (1) {
if (slpcnt > 0) {
logprt("sleep %d\n",slpcnt);
usleep(slpcnt);
slpcnt = 0;
}
// receiving
errno = 0;
ssize_t received = mq_receive(reqQueue,(char *) &req,
sizeof(req),NULL);
logprt("received length %d -- seqno=%u toval=%d\n",
(int) received,req.seqno,req.toval);
if (received < 0)
sysfault("worker: mq_receive fault -- %s\n",strerror(errno));
// received stop signal
if (req.toval < 0) {
logprt("stopping ...\n");
break;
}
if (req.toval != opt_W) {
logprt("misroute\n");
if (! opt_i)
sysfault("worker: misroute fault\n");
}
if (req.seqno != seqno) {
logprt("sequence fault\n");
if (! opt_i)
sysfault("worker: sequence fault\n");
}
// do something with request data
resp.seqno = req.seqno;
resp.toval = req.toval;
resp.fmval = opt_W;
// send response
errno = 0;
sent = mq_send(respQueue,(char *) &resp,sizeof(resp),0);
logprt("sent response with length %d -- seqno=%u toval=%d\n",
(int) sent,req.seqno,resp.toval);
// HERE &resp is always different (doesn't print)
if (sent < 0)
sysfault("worker: mq_send fault -- %s\n",strerror(errno));
++seqno;
}
mq_close(reqQueue);
mq_close(respQueue);
logexit(0);
}
int
main(int argc,char **argv)
{
char *cp;
pgmname = argv[0];
--argc;
++argv;
opt_W = -1;
opt_S = 3;
reqQueue = -1;
respQueue = -1;
signal(SIGUSR1,sighdr);
signal(SIGUSR2,sighdr);
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'b': // broadcast mode (single request queue)
cp += 2;
opt_b = (*cp != 0) ? atoi(cp) : 1;
break;
case 'i': // ignore errors
cp += 2;
opt_i = (*cp != 0) ? atoi(cp) : 1;
break;
case 'S': // sequence maximum
cp += 2;
opt_S = (*cp != 0) ? atoi(cp) : 3;
break;
case 's': // sleep mode (milliseconds)
cp += 2;
opt_s = (*cp != 0) ? atoi(cp) : 3;
break;
case 'W': // worker number
cp += 2;
opt_W = atoi(cp + 2);
break;
case 'x': // use execlp
opt_x = ! opt_x;
break;
}
}
if (opt_W >= 0)
worker(1);
else
farmer();
return 0;
}