Проблема с производительностью высокоскоростного многопоточного приемника UDP

Я стараюсь иметь относительно высокоскоростной приемник UDP (начиная со 100 МБ / с).

Я отправляю пакет UDP в локальной сети с MTU в 1500 байт (поэтому каждый пакет имеет фиксированную длину рядом с этим значением).

У меня есть три основных компонента: - Принимающий поток, который записывает полученный пакет в буфер предварительного выделения, а затем копирует данные в динамически выделяемый буфер - Поток процесса, который обрабатывает всю тяжелую проверку полученных пакетов и освобождает память - A потокобезопасный связанный список (в этом конкретном сценарии уникальной пары производитель / потребитель), который используется для передачи данных между потоком-получателем и потоком процесса

Поскольку вычислительная часть занимает намного больше времени, чем получение пакетов, я начал использовать несколько потоков процессов (и для минимизации использования блокировки каждый поток процесса имел свой собственный связанный список). И я использовал cpuset, чтобы убедиться, что все мои потоки не будут слишком сильно прерваны.

В основном у меня 4-ядерный процессор, каждый из которых может запускать 2 потока. Итак, я защитил 3 из них с помощью cpuset и запустил 3 потока процессов и поток приемника. И если я смог проследить приемную нагрузку с этими тремя потоками, то оказалось, что я начал пропускать пакеты в принимающем потоке. Единственный способ получить весь отправленный пакет состоял в том, чтобы иметь только 2 потока процессов (что в моей ситуации недостаточно для отслеживания входящих нагрузок, что приводит к переполнению ОЗУ, потому что в связанный список добавляется слишком много пакетов, а они не освободился достаточно быстро).

Я не понимаю результатов: если я зарезервировал 3 ядра (в которых, безусловно, есть другие потоки ядра, но все несущественные потоки исключены), я должен иметь возможность запускать одновременно 6 потоков, верно?

Как я могу справиться с входящей загрузкой пакетов?

Некоторый контекст:

  • Я проверил возможности сетевого адаптера отправителя / получателя, и он может обрабатывать нагрузки со скоростью 120 МБ / с.
  • Я проверил, получает ли мой компьютер весь пакет, ничего не делая. Он может обрабатывать около 100 МБ / с в течение короткого периода времени, но когда ему приходится делать это в течение длительного периода (например, более минуты), необходимо использовать cpuset, чтобы продолжать получать весь пакет (без выполнения каких-либо процессов после)
  • Сначала я запустил код в JAVA, но даже с cpuset кажется, что код слишком медленный, чтобы просто получать все пакеты

--- Минимальный пример ---

Отправитель здесь эмулируется в потоке, но на самом деле он находится на другом компьютере.

#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <string.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <netinet/in.h> 
#include <netdb.h> 
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <signal.h>  
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <sched.h>

#define NB_THREADS 3 //number thread to process the received packets
#define PACKET_SIZE 1500

#define MB_RATE 80 //second
#define BYTE_RATE (int)(MB_RATE*1000000) //second
//time to wqit between the sending of two packets
#define TIME_WAIT_US ((int)(1000000*PACKET_SIZE))/(int)BYTE_RATE //micro second

#define HOSTNAME "127.0.0.1"
#define PORT 1652

//display info each X paquets
#define DISPLAY_COUNT 1000

long counts[NB_THREADS];//count of the processed packets for each thread
long count_r = 0;//count of the processed packets for the receiver thread
long count_s = 0;//count of the processed packets for the sender thread

int socket_fd;
int stop = 0;


long count_arrray(long * counts, unsigned int size)//total count of all the processed packets
{
    long count  = 0;
    for(int i=0; i<size; i++)
    {
        count+=counts[i];
    }

    return count;
}

/* Thread safe (as a unique couple producer/consumer) simple linked list
 * The list is used to transfer data from the receiver thread to the process thread
*/
typedef struct _node Node;

struct _node
{
    void * data;
    Node * next;
};

typedef struct _list
{
    Node * first;
    Node * last;
    pthread_mutex_t lock;

} LinkedList;

LinkedList * createList()
{
    LinkedList *  linked_list =  (LinkedList *)malloc(sizeof(LinkedList));
    linked_list->first = NULL;
    linked_list->last = NULL;
    pthread_mutex_init(&linked_list->lock, NULL);

    return linked_list;
}

Node * createNode(void * data)
{
    Node * node = (Node *)malloc(sizeof(Node));

    node->data = data;
    node->next = NULL;

    return node;
}


void addList(LinkedList * linked_list, void * data)
{
    Node * node = createNode(data);

    int lock;

    //only need to lock if we add to edit the tail of the list
    if(linked_list->last==linked_list->first) 
    {
        lock = 1;
    }else
    {
        lock = 0;
    }


    if(lock)
        pthread_mutex_lock(&linked_list->lock);

    if(linked_list->first==NULL)
    {
        linked_list->first = node;
    }else
    {
        linked_list->last->next = node;
    }
    linked_list->last = node;
    node->next = NULL;

    if(lock)
        pthread_mutex_unlock(&linked_list->lock);
}



void * popList(LinkedList * linked_list)
{

    Node * node = NULL;
    if(linked_list->last==linked_list->first) 
    {
        if(linked_list->first==NULL)
        {
            return NULL;
        }

        pthread_mutex_lock(&linked_list->lock);

        if(linked_list->last==linked_list->first)//check if the two are still equals
        {
            node = linked_list->first;
            node->next = NULL;
            linked_list->first = NULL;
            linked_list->last = NULL;

        }else
        {
            node = linked_list->first;
            linked_list->first = node->next;
            node->next = NULL;
        }

        pthread_mutex_unlock(&linked_list->lock);

    }else
    {
        node = linked_list->first;
        if(node!=NULL)
        {
            linked_list->first = node->next;
            node->next = NULL;
        }
    }

    void * data = node->data;
    free(node);

    return data;
}

void clearList(LinkedList * linked_list)
{
    Node * node = popList(linked_list);

    while(node!=NULL)
    {
        free(node->data);
        free(node);

        node = popList(linked_list);
    }
}


//receiving thread
void * receiver(void * data)
{
    LinkedList ** listArray = (LinkedList **)data;

    //char packet[1500] = {0};
    char * packet;

    struct sockaddr_in  cliaddr;
    memset(&cliaddr, 0, sizeof(cliaddr)); 

    socklen_t len, packet_len; 
    len = sizeof(cliaddr);

    char buffer[PACKET_SIZE];

    unsigned int index = 0;
    fprintf(stderr, "receiver start\n");
    while(1)
    {
        packet_len = recvfrom(socket_fd, buffer, PACKET_SIZE,  
                    MSG_WAITALL, (struct sockaddr *)&cliaddr, 
                    &len);
        if(packet_len <=0 || stop)
            break;

        char * packet = (char *)malloc(sizeof(char)*PACKET_SIZE);
        for(int i=0; i<PACKET_SIZE; i++)
        {
            packet[i] = buffer[i];
        }

        addList(listArray[index], packet);
        if((++index)>=NB_THREADS)
            index = 0;

        count_r++;
    }

    return NULL;
}


//receiving thread
void * sender(void * data)
{
    struct sockaddr_in  * servaddr = (struct sockaddr_in  *)data;


    socklen_t len = sizeof(*servaddr);
    char packet[PACKET_SIZE] = {0};

    fprintf(stderr, "sender start\n");

    while(!stop)
    {
        sendto(socket_fd, packet, PACKET_SIZE, 0, (const struct sockaddr *) servaddr, len); 
        usleep((useconds_t)TIME_WAIT_US);

        count_s++;
        if(count_s%DISPLAY_COUNT==0)
        {
            fprintf(stderr, "----------\n\tcount send : %ld\n\tcount receive : %ld\n\tcount process (approx) : %ld\n----------\n", 
                count_s, count_r, count_arrray(counts, NB_THREADS));
        }
    }

    return NULL;
}


//process thread
void * process(void * data)
{
    static int id = -1;
    id++;

    LinkedList * list = (LinkedList *)data;

    fprintf(stderr, "process %d start\n", id);

    while(!stop)
    {
        char * packet = (char *)popList(list);
        if(packet==NULL)
        {
            continue;
        }

        //heavy process
        usleep(1000);

        counts[id]++;

        free(packet);
    }

    char * packet = (char *)popList(list);
    while(packet!=NULL)
    {
        counts[id]++;

        free(packet);

        packet = (char *)popList(list);
    }

    return NULL;
}



//socket creation

int createSocket(struct sockaddr_in  * servaddr)
{
    const char * hostname = HOSTNAME;
    int port = PORT;

    struct hostent *hostinfo = gethostbyname(hostname); /* on récupère les informations de l'hôte auquel on veut se connecter */
    if (hostinfo == NULL) /* l'hôte n'existe pas */
    {
        fprintf(stderr, "hostname fail\n");
        exit(-1);
    }

    // Filling server information 
    memset(servaddr, 0, sizeof(servaddr)); 

    servaddr->sin_family = hostinfo->h_addrtype;
    servaddr->sin_addr = *(struct in_addr *) hostinfo->h_addr_list[0]; 
    servaddr->sin_port = htons(port); 

    // Starting socket
    int socket_fd = socket(hostinfo->h_addrtype, SOCK_DGRAM, 0);
    if(socket_fd==-1) 
    { 
        fprintf(stderr, "socket fail\n");
        exit(-1);
    }

    //set the socket reusable
    int yes=1;
    if(setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1) 
    {
        fprintf(stderr, "socket set fail\n");
        exit(-1);
    }

    // Bind the socket with the server address 
    if (bind(socket_fd, (const struct sockaddr *)servaddr, sizeof(*servaddr))==-1) 
    { 
        fprintf(stderr, "bind fail %d\n", errno);
        exit(-1);
    }

    return socket_fd;
}



int main()
{
    //create socket
    struct sockaddr_in servaddr;
    socket_fd = createSocket(&servaddr);


    //start threads
    LinkedList * list_array[NB_THREADS];
    pthread_t threads[NB_THREADS];

    for(int i=0; i<NB_THREADS; i++)
    {
        list_array[i] = createList();
        counts[i] = 0;
        if(pthread_create(&threads[i], NULL, process, list_array[i])!=0) 
        {
            fprintf(stderr, "pthread create process %d fail\n", i);
            exit(-1);
        }
    }

    pthread_t thread_r, thread_s;
    if (pthread_create(&thread_r, NULL, receiver, list_array)!=0) 
    {
        fprintf(stderr, "pthread create receiver fail\n");
        exit(-1);
    }

    if (pthread_create(&thread_s, NULL, sender, &servaddr)!=0) 
    {
        fprintf(stderr, "pthread create sender fail\n");
        exit(-1);
    }

    //wait user input to exit
    char invoer;
    printf("\n");
    scanf("%c",&invoer); 
    fprintf(stderr, "ending...");

    stop = 1;
    sendto(socket_fd, NULL, 0, 0, (const struct sockaddr *)&servaddr, sizeof(servaddr)); 
    pthread_join(thread_r, NULL);

    pthread_cancel(thread_s);
    pthread_join(thread_s, NULL);

    for(int i=0; i<NB_THREADS; i++)
    {
        pthread_join(threads[i], NULL);
        free(list_array[i]);
    }

    return 0;
}

0 ответов

Другие вопросы по тегам