Чтение из множества неблокирующих именованных каналов в linux
Основываясь на похожем примере, расположенном здесь в stackru, у меня есть три именованных канала, pipe_a, pipe_b и pipe_c, которые поступают от внешних процессов. Я хотел бы иметь процесс чтения, который выводит на консоль, что бы ни записывалось в любой из этих каналов.
Приведенная ниже программа представляет собой комплексную программу c, которая должна читать три канала неблокирующим образом и отображать выходные данные, когда какой-либо из каналов получает новые данные.
Однако, это не работает - это блокирует! Если pipe_a получает данные, он отображает их, а затем ожидает поступления новых данных в pipe_b и т. Д.
Функция select() должна позволять отслеживать несколько файловых дескрипторов до тех пор, пока один из них не будет готов, и тогда мы должны перейти в функцию чтения канала и получить данные.
Может кто-нибудь помочь определить, почему трубы ведут себя так, как будто они находятся в режиме блокировки?
/*
* FIFO example using select.
*
* $ mkfifo /tmp/fifo
* $ clang -Wall -o test ./test.c
* $ ./test &
* $ echo 'hello' > /tmp/fifo
* $ echo 'hello world' > /tmp/fifo
* $ killall test
*/
#include <sys/types.h>
#include <sys/select.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
// globals
int fd_a, fd_b, fd_c;
int nfd_a, nfd_b, nfd_c;
fd_set set_a, set_b, set_c;
char buffer_a[100*1024];
char buffer_b[100*1024];
char buffer_c[100*1024];
int readPipeA()
{
ssize_t bytes;
size_t total_bytes;
if (FD_ISSET(fd_a, &set_a)) {
printf("\nDescriptor %d has new data to read.\n", fd_a);
total_bytes = 0;
for (;;) {
printf("\nDropped into read loop\n");
bytes = read(fd_a, buffer_a, sizeof(buffer_a));
if (bytes > 0) {
total_bytes += (size_t)bytes;
printf("%s", buffer_a);
} else {
if (errno == EWOULDBLOCK) {
printf("\ndone reading (%ul bytes)\n", total_bytes);
break;
} else {
perror("read");
return EXIT_FAILURE;
}
}
}
}
}
int readPipeB()
{
ssize_t bytes;
size_t total_bytes;
if (FD_ISSET(fd_b, &set_b)) {
printf("\nDescriptor %d has new data to read.\n", fd_b);
total_bytes = 0;
for (;;) {
printf("\nDropped into read loop\n");
bytes = read(fd_b, buffer_b, sizeof(buffer_b));
if (bytes > 0) {
total_bytes += (size_t)bytes;
printf("%s", buffer_b);
} else {
if (errno == EWOULDBLOCK) {
printf("\ndone reading (%ul bytes)\n", total_bytes);
break;
} else {
perror("read");
return EXIT_FAILURE;
}
}
}
}
}
int readPipeC()
{
ssize_t bytes;
size_t total_bytes;
if (FD_ISSET(fd_c, &set_c)) {
printf("\nDescriptor %d has new data to read.\n", fd_c);
total_bytes = 0;
for (;;) {
printf("\nDropped into read loop\n");
bytes = read(fd_c, buffer_c, sizeof(buffer_c));
if (bytes > 0) {
total_bytes += (size_t)bytes;
printf("%s", buffer_c);
} else {
if (errno == EWOULDBLOCK) {
printf("\ndone reading (%ul bytes)\n", total_bytes);
break;
} else {
perror("read");
return EXIT_FAILURE;
}
}
}
}
}
int main(int argc, char* argv[])
{
// create pipes to monitor (if they don't already exist)
system("mkfifo /tmp/PIPE_A");
system("mkfifo /tmp/PIPE_B");
system("mkfifo /tmp/PIPE_C");
// open file descriptors of named pipes to watch
fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
if (fd_a == -1) {
perror("open");
return EXIT_FAILURE;
}
FD_ZERO(&set_a);
FD_SET(fd_a, &set_a);
fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
if (fd_b == -1) {
perror("open");
return EXIT_FAILURE;
}
FD_ZERO(&set_b);
FD_SET(fd_b, &set_b);
fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
if (fd_c == -1) {
perror("open");
return EXIT_FAILURE;
}
FD_ZERO(&set_c);
FD_SET(fd_c, &set_c);
for(;;)
{
// check pipe A
nfd_a= select(fd_a+1, &set_a, NULL, NULL, NULL);
if (nfd_a) {
if (nfd_a == -1) {
perror("select");
return EXIT_FAILURE;
}
readPipeA();
}
// check pipe B
nfd_b= select(fd_b+1, &set_b, NULL, NULL, NULL);
if (nfd_b) {
if (nfd_b == -1) {
perror("select");
return EXIT_FAILURE;
}
readPipeB();
}
// check pipe C
nfd_c= select(fd_c+1, &set_c, NULL, NULL, NULL);
if (nfd_c) {
if (nfd_c == -1) {
perror("select");
return EXIT_FAILURE;
}
readPipeC();
}
}
return EXIT_SUCCESS;
}
--- Обновленный код ---
Модифицировал приложение, основываясь на отзывах здесь и прочтении:
/*
* FIFO example using select.
*
* $ mkfifo /tmp/fifo
* $ clang -Wall -o test ./test.c
* $ ./test &
* $ echo 'hello' > /tmp/fifo
* $ echo 'hello world' > /tmp/fifo
* $ killall test
*/
#include <sys/types.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
int readPipe(int fd)
{
ssize_t bytes;
size_t total_bytes = 0;
char buffer[100*1024];
printf("\nDropped into read pipe\n");
for(;;) {
bytes = read(fd, buffer, sizeof(buffer));
if (bytes > 0) {
total_bytes += (size_t)bytes;
printf("%s", buffer);
} else {
if (errno == EWOULDBLOCK) {
printf("\ndone reading (%d bytes)\n", (int)total_bytes);
break;
} else {
perror("read");
return EXIT_FAILURE;
}
}
}
return EXIT_SUCCESS;
}
int main(int argc, char* argv[])
{
int fd_a, fd_b, fd_c; // file descriptors for each pipe
int nfd; // select() return value
fd_set read_fds; // file descriptor read flags
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
// create pipes to monitor (if they don't already exist)
system("mkfifo /tmp/PIPE_A");
system("mkfifo /tmp/PIPE_B");
system("mkfifo /tmp/PIPE_C");
// open file descriptors of named pipes to watch
fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
if (fd_a == -1) {
perror("open");
return EXIT_FAILURE;
}
fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
if (fd_b == -1) {
perror("open");
return EXIT_FAILURE;
}
fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
if (fd_c == -1) {
perror("open");
return EXIT_FAILURE;
}
FD_ZERO(&read_fds);
FD_SET(fd_a, &read_fds); // add pipe to the read descriptor watch list
FD_SET(fd_b, &read_fds);
FD_SET(fd_c, &read_fds);
for(;;)
{
// check if there is new data in any of the pipes
nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_a, &read_fds)) {
readPipe(fd_a);
}
}
nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_b, &read_fds)){
readPipe(fd_b);
}
}
nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_c, &read_fds)){
readPipe(fd_c);
}
}
usleep(10);
}
return EXIT_SUCCESS;
}
По-прежнему возникают проблемы с выбором, возвращающим ноль (0), когда в каком-либо из отслеживаемых каналов ожидают данные? Я не должен правильно использовать select() и fd_isset(). Вы видите, что я делаю не так? Благодарю.
4 ответа
Проблема в том, что функция выбора блокирует. Я понял select(), чтобы проверить флаги, чтобы увидеть, будет ли чтение "блокировать", если оно было выполнено, так что можно решить выполнить чтение или нет. Труба открывается в режимах RDWR и NONBLOCK.
Вы говорите, что проблема в том, что функция выбора блокирует, но продолжайте признавать, что NONBLOCK
flag только делает так, чтобы чтение блокировалось. Выбрать и прочитать две разные вещи.
O_NONBLOCK
флаг влияет на сокет (и, следовательно, ваш read
звонки); это не меняет поведение select
, который имеет свою семантику тайм-аута / блокировки.
man select
утверждает, что timeout
Аргумент с обоими числовыми элементами, установленными в ноль, производит неблокирующий опрос, тогда как аргумент тайм-аута NULL
может привести к неопределенному блоку:
Если параметр timeout является нулевым указателем, то вызов pselect() или select() должен блокироваться бесконечно, пока хотя бы один дескриптор не будет соответствовать указанным критериям. Чтобы выполнить опрос, параметр timeout не должен быть нулевым указателем и должен указывать на временную структуру с нулевым значением
timepec.
(NB. Текст далее вверх по странице указывает на то, что, хотя pselect()
занимает timespec
состав, select()
занимает timeval
состав; Я позволил себе применить эту логику к приведенной выше цитате.)
Итак, перед каждым select
вызов построить timeval
установите его членов на ноль и передайте это select
,
Пара замечаний, пока мы здесь:
В идеале у вас был бы только один
select
вызов, проверяя все три файловых дескриптора одновременно, а затем решая, какие каналыread
от проверки вашего набора FD сfd_isset
;Я также предлагаю положить немного
usleep
в конце вашего тела цикла, в противном случае ваша программа будет вращаться действительно очень быстро, когда не хватает данных.
Я взял фрагмент, который использовал для программирования сокетов, но он должен работать так же для именованных каналов. Это должно быть просто и легко следовать.
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cctype>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
int main()
{
fd_set readSet, writeSet, exSet;
struct timeval tv;
int i;
int fifoFds[3];
//open files or named pipes and put them into fifoFds array
while(1)
{
FD_ZERO(&readSet);
FD_ZERO(&writeSet); //not used
FD_ZERO(&exSet); //not used
int maxfd = -1;
for(i = 0; i < 3; i++)
{
if(maxfd == -1 || fifoFds[i] > maxfd)
maxfd = fifoFds[i];
FD_SET(fifoFds[i], &readSet);
}
tv.tv_sec = 1; //wait 1 second in select, change these as needed
tv.tv_usec = 0; //this is microseconds
select(maxfd+1, &readSet, &writeSet, &exSet, &tv);
for(i = 0; i < 3; i++)
{
if(FD_ISSET(fifoFds[i], &readSet))
{
//Read from that fifo now!
}
}
}
return 0;
}
Вот мое рабочее решение для чтения трех именованных каналов. Он может быть оптимизирован несколькими способами, но, как написано, это должно быть совершенно очевидно для всех, кому необходимо это сделать:
#include <sys/types.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
int readPipe(int fd)
{
ssize_t bytes;
size_t total_bytes = 0;
char buffer[100*1024];
printf("\nReading pipe descriptor # %d\n",fd);
for(;;) {
bytes = read(fd, buffer, sizeof(buffer));
if (bytes > 0) {
total_bytes += (size_t)bytes;
printf("%s", buffer);
} else {
if (errno == EWOULDBLOCK) {
break;
} else {
perror("read error");
return EXIT_FAILURE;
}
}
}
return EXIT_SUCCESS;
}
int main(int argc, char* argv[])
{
int fd_a, fd_b, fd_c; // file descriptors for each pipe
int nfd; // select() return value
fd_set read_fds; // file descriptor read flags
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
// create pipes to monitor (if they don't already exist)
system("mkfifo /tmp/PIPE_A");
system("mkfifo /tmp/PIPE_B");
system("mkfifo /tmp/PIPE_C");
// open file descriptors of named pipes to watch
fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
if (fd_a == -1) {
perror("open error");
return EXIT_FAILURE;
}
fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
if (fd_b == -1) {
perror("open error");
return EXIT_FAILURE;
}
fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
if (fd_c == -1) {
perror("open error");
return EXIT_FAILURE;
}
for(;;)
{
// clear fds read flags
FD_ZERO(&read_fds);
// check if there is new data in any of the pipes
// PIPE_A
FD_SET(fd_a, &read_fds);
nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select error");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_a, &read_fds)) {
readPipe(fd_a);
}
}
// PIPE_B
FD_SET(fd_b, &read_fds);
nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select error");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_b, &read_fds)){
readPipe(fd_b);
}
}
// PIPE_C
FD_SET(fd_c, &read_fds);
nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
if (nfd != 0) {
if (nfd == -1) {
perror("select error");
return EXIT_FAILURE;
}
if (FD_ISSET(fd_c, &read_fds)){
readPipe(fd_c);
}
}
usleep(100000);
}
return EXIT_SUCCESS;
}
Просто для того, чтобы сделать ваш код проще. Вам не нужно три выбора. Вы можете установить все бесплатные файловые дескрипторы с тремя вызовами FD_SET()
, выберите вызов, и if nfd > 0
проверить каждый fd_x with FD_ISSET()
,