Чтение из множества неблокирующих именованных каналов в linux

Основываясь на похожем примере, расположенном здесь в stackru, у меня есть три именованных канала, pipe_a, pipe_b и pipe_c, которые поступают от внешних процессов. Я хотел бы иметь процесс чтения, который выводит на консоль, что бы ни записывалось в любой из этих каналов.

Приведенная ниже программа представляет собой комплексную программу c, которая должна читать три канала неблокирующим образом и отображать выходные данные, когда какой-либо из каналов получает новые данные.

Однако, это не работает - это блокирует! Если pipe_a получает данные, он отображает их, а затем ожидает поступления новых данных в pipe_b и т. Д.

Функция select() должна позволять отслеживать несколько файловых дескрипторов до тех пор, пока один из них не будет готов, и тогда мы должны перейти в функцию чтения канала и получить данные.

Может кто-нибудь помочь определить, почему трубы ведут себя так, как будто они находятся в режиме блокировки?

/*
 * FIFO example using select.
 *
 * $ mkfifo /tmp/fifo
 * $ clang -Wall -o test ./test.c
 * $ ./test &
 * $ echo 'hello' > /tmp/fifo
 * $ echo 'hello world' > /tmp/fifo
 * $ killall test
 */

#include <sys/types.h>
#include <sys/select.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>


// globals
int fd_a, fd_b, fd_c;
int nfd_a, nfd_b, nfd_c;
fd_set set_a, set_b, set_c;
char buffer_a[100*1024];
char buffer_b[100*1024];
char buffer_c[100*1024];


int readPipeA()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_a, &set_a)) {
    printf("\nDescriptor %d has new data to read.\n", fd_a);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_a, buffer_a, sizeof(buffer_a));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_a);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}

int readPipeB()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_b, &set_b)) {
    printf("\nDescriptor %d has new data to read.\n", fd_b);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_b, buffer_b, sizeof(buffer_b));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_b);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}

int readPipeC()
{
ssize_t bytes;
size_t total_bytes;

if (FD_ISSET(fd_c, &set_c)) {
    printf("\nDescriptor %d has new data to read.\n", fd_c);
    total_bytes = 0;
    for (;;) {
        printf("\nDropped into read loop\n");
        bytes = read(fd_c, buffer_c, sizeof(buffer_c));
        if (bytes > 0) {
            total_bytes += (size_t)bytes;
            printf("%s", buffer_c);
        } else {
            if (errno == EWOULDBLOCK) {
                printf("\ndone reading (%ul bytes)\n", total_bytes);
                break;
            } else {
                perror("read");
                return EXIT_FAILURE;
            }
        }
    }
}
}


int main(int argc, char* argv[])
    {


    // create pipes to monitor (if they don't already exist)
    system("mkfifo /tmp/PIPE_A");
    system("mkfifo /tmp/PIPE_B");
    system("mkfifo /tmp/PIPE_C");


    // open file descriptors of named pipes to watch
    fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
    if (fd_a == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_a);
    FD_SET(fd_a, &set_a);


    fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
    if (fd_b == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_b);
    FD_SET(fd_b, &set_b);


    fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
    if (fd_c == -1) {
    perror("open");
    return EXIT_FAILURE;
    }
    FD_ZERO(&set_c);
    FD_SET(fd_c, &set_c);



    for(;;)
    {
        // check pipe A
        nfd_a= select(fd_a+1, &set_a, NULL, NULL, NULL);
        if (nfd_a) {
            if (nfd_a == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeA();
        }

        // check pipe B
        nfd_b= select(fd_b+1, &set_b, NULL, NULL, NULL);
        if (nfd_b) {
            if (nfd_b == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeB();
        }

        // check pipe C
        nfd_c= select(fd_c+1, &set_c, NULL, NULL, NULL);
        if (nfd_c) {
            if (nfd_c == -1) {
                perror("select");
                return EXIT_FAILURE;
            }
            readPipeC();
        }
    }

    return EXIT_SUCCESS;
}

--- Обновленный код ---

Модифицировал приложение, основываясь на отзывах здесь и прочтении:

    /*
     * FIFO example using select.
     *
     * $ mkfifo /tmp/fifo
     * $ clang -Wall -o test ./test.c
     * $ ./test &
     * $ echo 'hello' > /tmp/fifo
     * $ echo 'hello world' > /tmp/fifo
     * $ killall test
     */

    #include <sys/types.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>


    int readPipe(int fd)
    {
        ssize_t bytes;
        size_t total_bytes = 0;
        char buffer[100*1024];

        printf("\nDropped into read pipe\n");
        for(;;) {
            bytes = read(fd, buffer, sizeof(buffer));
            if (bytes > 0) {
                total_bytes += (size_t)bytes;
                printf("%s", buffer);
            } else {
                if (errno == EWOULDBLOCK) {
                    printf("\ndone reading (%d bytes)\n", (int)total_bytes);
                    break;
                } else {
                    perror("read");
                    return EXIT_FAILURE;
                }
            }
        }
        return EXIT_SUCCESS;
    }


    int main(int argc, char* argv[])
    {
        int fd_a, fd_b, fd_c;   // file descriptors for each pipe
        int nfd;                // select() return value
        fd_set read_fds;        // file descriptor read flags
        struct timeval tv;
        tv.tv_sec = 0;
        tv.tv_usec = 0;

        // create pipes to monitor (if they don't already exist)
        system("mkfifo /tmp/PIPE_A");
        system("mkfifo /tmp/PIPE_B");
        system("mkfifo /tmp/PIPE_C");

        // open file descriptors of named pipes to watch
        fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
        if (fd_a == -1) {
            perror("open");
            return EXIT_FAILURE;
        }

        fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
        if (fd_b == -1) {
            perror("open");
            return EXIT_FAILURE;
        }

        fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
        if (fd_c == -1) {
            perror("open");
            return EXIT_FAILURE;
        }

        FD_ZERO(&read_fds);
        FD_SET(fd_a, &read_fds);  // add pipe to the read descriptor watch list
        FD_SET(fd_b, &read_fds);
        FD_SET(fd_c, &read_fds);

        for(;;)
        {
            // check if there is new data in any of the pipes
            nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }

                if (FD_ISSET(fd_a, &read_fds)) {
                    readPipe(fd_a);
                }
            }

            nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }

                if (FD_ISSET(fd_b, &read_fds)){
                    readPipe(fd_b);
                }
            }
            nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_c, &read_fds)){
                    readPipe(fd_c);
                }
            }

            usleep(10);
        }
        return EXIT_SUCCESS;
    }

По-прежнему возникают проблемы с выбором, возвращающим ноль (0), когда в каком-либо из отслеживаемых каналов ожидают данные? Я не должен правильно использовать select() и fd_isset(). Вы видите, что я делаю не так? Благодарю.

4 ответа

Решение

Проблема в том, что функция выбора блокирует. Я понял select(), чтобы проверить флаги, чтобы увидеть, будет ли чтение "блокировать", если оно было выполнено, так что можно решить выполнить чтение или нет. Труба открывается в режимах RDWR и NONBLOCK.

Вы говорите, что проблема в том, что функция выбора блокирует, но продолжайте признавать, что NONBLOCK flag только делает так, чтобы чтение блокировалось. Выбрать и прочитать две разные вещи.

O_NONBLOCK флаг влияет на сокет (и, следовательно, ваш read звонки); это не меняет поведение select , который имеет свою семантику тайм-аута / блокировки.

man selectутверждает, что timeout Аргумент с обоими числовыми элементами, установленными в ноль, производит неблокирующий опрос, тогда как аргумент тайм-аута NULL может привести к неопределенному блоку:

Если параметр timeout является нулевым указателем, то вызов pselect() или select() должен блокироваться бесконечно, пока хотя бы один дескриптор не будет соответствовать указанным критериям. Чтобы выполнить опрос, параметр timeout не должен быть нулевым указателем и должен указывать на временную структуру с нулевым значением timepec.

(NB. Текст далее вверх по странице указывает на то, что, хотя pselect() занимает timespec состав, select() занимает timeval состав; Я позволил себе применить эту логику к приведенной выше цитате.)

Итак, перед каждым select вызов построить timeval установите его членов на ноль и передайте это select,

Пара замечаний, пока мы здесь:

  1. В идеале у вас был бы только один select вызов, проверяя все три файловых дескриптора одновременно, а затем решая, какие каналы read от проверки вашего набора FD с fd_isset;

  2. Я также предлагаю положить немного usleep в конце вашего тела цикла, в противном случае ваша программа будет вращаться действительно очень быстро, когда не хватает данных.

Я взял фрагмент, который использовал для программирования сокетов, но он должен работать так же для именованных каналов. Это должно быть просто и легко следовать.

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cctype>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>

int main()
{
  fd_set readSet, writeSet, exSet;
  struct timeval tv;
  int i;

  int fifoFds[3];

  //open files or named pipes and put them into fifoFds array

  while(1)
  {

    FD_ZERO(&readSet);
    FD_ZERO(&writeSet); //not used
    FD_ZERO(&exSet); //not used

    int maxfd = -1;
    for(i = 0; i < 3; i++)
    {
      if(maxfd == -1 || fifoFds[i] > maxfd) 
        maxfd = fifoFds[i];

      FD_SET(fifoFds[i], &readSet);
    }

    tv.tv_sec = 1; //wait 1 second in select, change these as needed
    tv.tv_usec = 0; //this is microseconds

    select(maxfd+1, &readSet, &writeSet, &exSet, &tv);

    for(i = 0; i < 3; i++)
    {
      if(FD_ISSET(fifoFds[i], &readSet))
      {
        //Read from that fifo now!
      }
    }

  }

  return 0;
}

Вот мое рабочее решение для чтения трех именованных каналов. Он может быть оптимизирован несколькими способами, но, как написано, это должно быть совершенно очевидно для всех, кому необходимо это сделать:

    #include <sys/types.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>


    int readPipe(int fd)
    {
        ssize_t bytes;
        size_t total_bytes = 0;
        char buffer[100*1024];

        printf("\nReading pipe descriptor # %d\n",fd);
        for(;;) {
            bytes = read(fd, buffer, sizeof(buffer));
            if (bytes > 0) {
                total_bytes += (size_t)bytes;
                printf("%s", buffer);
            } else {
                if (errno == EWOULDBLOCK) {
                    break;
                } else {
                    perror("read error");
                    return EXIT_FAILURE;
                }
            }
        }
        return EXIT_SUCCESS;
    }


    int main(int argc, char* argv[])
    {
        int fd_a, fd_b, fd_c;   // file descriptors for each pipe
        int nfd;                // select() return value
        fd_set read_fds;        // file descriptor read flags
        struct timeval tv;
        tv.tv_sec = 0;
        tv.tv_usec = 0;

        // create pipes to monitor (if they don't already exist)
        system("mkfifo /tmp/PIPE_A");
        system("mkfifo /tmp/PIPE_B");
        system("mkfifo /tmp/PIPE_C");

        // open file descriptors of named pipes to watch
        fd_a = open("/tmp/PIPE_A", O_RDWR | O_NONBLOCK);
        if (fd_a == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        fd_b = open("/tmp/PIPE_B", O_RDWR | O_NONBLOCK);
        if (fd_b == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        fd_c = open("/tmp/PIPE_C", O_RDWR | O_NONBLOCK);
        if (fd_c == -1) {
            perror("open error");
            return EXIT_FAILURE;
        }

        for(;;)
        {
            // clear fds read flags
            FD_ZERO(&read_fds);

            // check if there is new data in any of the pipes
            // PIPE_A
            FD_SET(fd_a, &read_fds);
            nfd = select(fd_a+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_a, &read_fds)) {
                    readPipe(fd_a);
                }
            }

            // PIPE_B
            FD_SET(fd_b, &read_fds);
            nfd = select(fd_b+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_b, &read_fds)){
                    readPipe(fd_b);
                }
            }

            // PIPE_C
            FD_SET(fd_c, &read_fds);
            nfd = select(fd_c+1, &read_fds, NULL, NULL, &tv);
            if (nfd != 0) {
                if (nfd == -1) {
                    perror("select error");
                    return EXIT_FAILURE;
                }
                if (FD_ISSET(fd_c, &read_fds)){
                    readPipe(fd_c);
                }
            }

            usleep(100000);
        }
        return EXIT_SUCCESS;
    }

Просто для того, чтобы сделать ваш код проще. Вам не нужно три выбора. Вы можете установить все бесплатные файловые дескрипторы с тремя вызовами FD_SET(), выберите вызов, и if nfd > 0 проверить каждый fd_x with FD_ISSET(),

Другие вопросы по тегам