recvmmsg/recv/recv из блоков под высокой нагрузкой

У нас есть приложение на Centos 6, которое вызывает recvmmsg() на многоадресный адрес для чтения 1024 пакетов UDP одновременно. Когда мы запускаем несколько экземпляров этого приложения в одном и том же окне (все слушают один и тот же трафик), иногда этот вызов блокируется на несколько секунд, несмотря на то, что сокет не является блокирующим, и передает MSG_DONTWAIT, Он отлично работает при любых других обстоятельствах, но будет зависать при высокой нагрузке (50 МБ / с). Когда приложение блокируется, мы отстаем в трафике UDP и не можем восстановиться. Процесс выполняется с использованием планировщика RR в качестве высокого приоритета, чтобы избежать помех от других процессов. Мы попытались перейти на recvfrom() а также recv() в цикле, а с теми же результатами.

Единственное, что мы можем увидеть в исходном коде ядра, которое может это заблокировать, это spin_lock_irqsave() в очереди заблокировать __skb_try_recv_datagram(), Но я не знаю, при каких обстоятельствах это будет проблемой, или что с этим делать, чтобы предотвратить блокировку, или действительно ли это проблема.

Я не уверен, где искать дальше, поэтому любые указатели будут оценены.

Создана очень простая программа, которая может воспроизвести это на одном из серверов, где мы видим это (не вставила функцию поиска интерфейса, но это не должно быть актуально, дайте мне знать, если вам это нужно).

Пример recv():

int main(){
    int fd = socket(AF_INET,SOCK_DGRAM,0);
    int flags = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL, flags | O_NONBLOCK);
    int reuse = 1;
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
    struct sockaddr_in sockaddr;
    sockaddr.sin_port = htons(4755);
    sockaddr.sin_family = AF_INET;
    sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ 
      printf("Failed to bind.\n");
      return 1;
    }

    in_addr_t interface;
    if(!getInterface("192.168.15.255",&interface)){
      printf("Failed to get interface.\n");
      return 1;
    }
    struct ip_mreq imr;
    memset(&imr,0,sizeof(imr));
    imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
    imr.imr_interface.s_addr = interface;
    if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
      printf("Group not in multicast.");
      return 1;
    }
    if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr))    < 0){ 
      printf("Failed to add membership, errno: %d.\n",errno);
      return 1;
    }

    int epollInstance = epoll_create1(0);
    struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
    epollEvents[0].events = EPOLLIN;
    epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);

    const int PACKETS_TO_READ = 1024;
    static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
    static struct iovec iovecs[PACKETS_TO_READ];
    static struct mmsghdr msgs[PACKETS_TO_READ];
    static struct sockaddr_in sockFrom[PACKETS_TO_READ];
    for (int i = 0; i < PACKETS_TO_READ; i++) {
      iovecs[i].iov_base         = receiveBuffer[i];
      iovecs[i].iov_len          = USHRT_MAX;
      msgs[i].msg_hdr.msg_iov    = &iovecs[i];
      msgs[i].msg_hdr.msg_iovlen = 1;
      msgs[i].msg_hdr.msg_name = &sockFrom[i];
      msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
    }

    struct timeval start;
    struct timeval end;  

    while(1){
      int selected = epoll_wait(epollInstance,epollEvents,8192,10);
      if(selected > 0){ 
        gettimeofday(&start,NULL);
        // uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue
        //  int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
        int numPackets = 0;
        for(int i = 0; i < PACKETS_TO_READ; i++){
        int result = recv(fd,receiveBuffer[0],USHRT_MAX,MSG_DONTWAIT);
        if(result == EAGAIN) break;
          numPackets++;
        }
        gettimeofday(&end,NULL);
        printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
      }   
    }
}

Пример recvmmsg():

int main(){
    int fd = socket(AF_INET,SOCK_DGRAM,0);
    int flags = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL, flags | O_NONBLOCK);
    int reuse = 1;
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
    struct sockaddr_in sockaddr;
    sockaddr.sin_port = htons(4755);
    sockaddr.sin_family = AF_INET;
    sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ 
      printf("Failed to bind.\n");
      return 1;
    }

    in_addr_t interface;
    if(!getInterface("192.168.15.255",&interface)){
      printf("Failed to get interface.\n");
      return 1;
    }
    struct ip_mreq imr;
    memset(&imr,0,sizeof(imr));
    imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
    imr.imr_interface.s_addr = interface;
    if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
      printf("Group not in multicast.");
      return 1;
    }
    if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr))    < 0){ 
      printf("Failed to add membership, errno: %d.\n",errno);
      return 1;
    }

    int epollInstance = epoll_create1(0);
    struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
    epollEvents[0].events = EPOLLIN;
    epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);

    const int PACKETS_TO_READ = 1024;
    static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
    static struct iovec iovecs[PACKETS_TO_READ];
    static struct mmsghdr msgs[PACKETS_TO_READ];
    static struct sockaddr_in sockFrom[PACKETS_TO_READ];
    for (int i = 0; i < PACKETS_TO_READ; i++) {
      iovecs[i].iov_base         = receiveBuffer[i];
      iovecs[i].iov_len          = USHRT_MAX;
      msgs[i].msg_hdr.msg_iov    = &iovecs[i];
      msgs[i].msg_hdr.msg_iovlen = 1;
      msgs[i].msg_hdr.msg_name = &sockFrom[i];
      msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
    }

    struct timeval start;
    struct timeval end;  

    while(1){
      int selected = epoll_wait(epollInstance,epollEvents,8192,10);
      if(selected > 0){ 
        gettimeofday(&start,NULL);
        // uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue
        int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
        gettimeofday(&end,NULL);
        printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
      }   
    }
}

0 ответов

Другие вопросы по тегам