Проблема с производительностью высокоскоростного многопоточного приемника UDP
Я стараюсь иметь относительно высокоскоростной приемник UDP (начиная со 100 МБ / с).
Я отправляю пакет UDP в локальной сети с MTU в 1500 байт (поэтому каждый пакет имеет фиксированную длину рядом с этим значением).
У меня есть три основных компонента: - Принимающий поток, который записывает полученный пакет в буфер предварительного выделения, а затем копирует данные в динамически выделяемый буфер - Поток процесса, который обрабатывает всю тяжелую проверку полученных пакетов и освобождает память - A потокобезопасный связанный список (в этом конкретном сценарии уникальной пары производитель / потребитель), который используется для передачи данных между потоком-получателем и потоком процесса
Поскольку вычислительная часть занимает намного больше времени, чем получение пакетов, я начал использовать несколько потоков процессов (и для минимизации использования блокировки каждый поток процесса имел свой собственный связанный список). И я использовал cpuset, чтобы убедиться, что все мои потоки не будут слишком сильно прерваны.
В основном у меня 4-ядерный процессор, каждый из которых может запускать 2 потока. Итак, я защитил 3 из них с помощью cpuset и запустил 3 потока процессов и поток приемника. И если я смог проследить приемную нагрузку с этими тремя потоками, то оказалось, что я начал пропускать пакеты в принимающем потоке. Единственный способ получить весь отправленный пакет состоял в том, чтобы иметь только 2 потока процессов (что в моей ситуации недостаточно для отслеживания входящих нагрузок, что приводит к переполнению ОЗУ, потому что в связанный список добавляется слишком много пакетов, а они не освободился достаточно быстро).
Я не понимаю результатов: если я зарезервировал 3 ядра (в которых, безусловно, есть другие потоки ядра, но все несущественные потоки исключены), я должен иметь возможность запускать одновременно 6 потоков, верно?
Как я могу справиться с входящей загрузкой пакетов?
Некоторый контекст:
- Я проверил возможности сетевого адаптера отправителя / получателя, и он может обрабатывать нагрузки со скоростью 120 МБ / с.
- Я проверил, получает ли мой компьютер весь пакет, ничего не делая. Он может обрабатывать около 100 МБ / с в течение короткого периода времени, но когда ему приходится делать это в течение длительного периода (например, более минуты), необходимо использовать cpuset, чтобы продолжать получать весь пакет (без выполнения каких-либо процессов после)
- Сначала я запустил код в JAVA, но даже с cpuset кажется, что код слишком медленный, чтобы просто получать все пакеты
--- Минимальный пример ---
Отправитель здесь эмулируется в потоке, но на самом деле он находится на другом компьютере.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <sched.h>
#define NB_THREADS 3 //number thread to process the received packets
#define PACKET_SIZE 1500
#define MB_RATE 80 //second
#define BYTE_RATE (int)(MB_RATE*1000000) //second
//time to wqit between the sending of two packets
#define TIME_WAIT_US ((int)(1000000*PACKET_SIZE))/(int)BYTE_RATE //micro second
#define HOSTNAME "127.0.0.1"
#define PORT 1652
//display info each X paquets
#define DISPLAY_COUNT 1000
long counts[NB_THREADS];//count of the processed packets for each thread
long count_r = 0;//count of the processed packets for the receiver thread
long count_s = 0;//count of the processed packets for the sender thread
int socket_fd;
int stop = 0;
long count_arrray(long * counts, unsigned int size)//total count of all the processed packets
{
long count = 0;
for(int i=0; i<size; i++)
{
count+=counts[i];
}
return count;
}
/* Thread safe (as a unique couple producer/consumer) simple linked list
* The list is used to transfer data from the receiver thread to the process thread
*/
typedef struct _node Node;
struct _node
{
void * data;
Node * next;
};
typedef struct _list
{
Node * first;
Node * last;
pthread_mutex_t lock;
} LinkedList;
LinkedList * createList()
{
LinkedList * linked_list = (LinkedList *)malloc(sizeof(LinkedList));
linked_list->first = NULL;
linked_list->last = NULL;
pthread_mutex_init(&linked_list->lock, NULL);
return linked_list;
}
Node * createNode(void * data)
{
Node * node = (Node *)malloc(sizeof(Node));
node->data = data;
node->next = NULL;
return node;
}
void addList(LinkedList * linked_list, void * data)
{
Node * node = createNode(data);
int lock;
//only need to lock if we add to edit the tail of the list
if(linked_list->last==linked_list->first)
{
lock = 1;
}else
{
lock = 0;
}
if(lock)
pthread_mutex_lock(&linked_list->lock);
if(linked_list->first==NULL)
{
linked_list->first = node;
}else
{
linked_list->last->next = node;
}
linked_list->last = node;
node->next = NULL;
if(lock)
pthread_mutex_unlock(&linked_list->lock);
}
void * popList(LinkedList * linked_list)
{
Node * node = NULL;
if(linked_list->last==linked_list->first)
{
if(linked_list->first==NULL)
{
return NULL;
}
pthread_mutex_lock(&linked_list->lock);
if(linked_list->last==linked_list->first)//check if the two are still equals
{
node = linked_list->first;
node->next = NULL;
linked_list->first = NULL;
linked_list->last = NULL;
}else
{
node = linked_list->first;
linked_list->first = node->next;
node->next = NULL;
}
pthread_mutex_unlock(&linked_list->lock);
}else
{
node = linked_list->first;
if(node!=NULL)
{
linked_list->first = node->next;
node->next = NULL;
}
}
void * data = node->data;
free(node);
return data;
}
void clearList(LinkedList * linked_list)
{
Node * node = popList(linked_list);
while(node!=NULL)
{
free(node->data);
free(node);
node = popList(linked_list);
}
}
//receiving thread
void * receiver(void * data)
{
LinkedList ** listArray = (LinkedList **)data;
//char packet[1500] = {0};
char * packet;
struct sockaddr_in cliaddr;
memset(&cliaddr, 0, sizeof(cliaddr));
socklen_t len, packet_len;
len = sizeof(cliaddr);
char buffer[PACKET_SIZE];
unsigned int index = 0;
fprintf(stderr, "receiver start\n");
while(1)
{
packet_len = recvfrom(socket_fd, buffer, PACKET_SIZE,
MSG_WAITALL, (struct sockaddr *)&cliaddr,
&len);
if(packet_len <=0 || stop)
break;
char * packet = (char *)malloc(sizeof(char)*PACKET_SIZE);
for(int i=0; i<PACKET_SIZE; i++)
{
packet[i] = buffer[i];
}
addList(listArray[index], packet);
if((++index)>=NB_THREADS)
index = 0;
count_r++;
}
return NULL;
}
//receiving thread
void * sender(void * data)
{
struct sockaddr_in * servaddr = (struct sockaddr_in *)data;
socklen_t len = sizeof(*servaddr);
char packet[PACKET_SIZE] = {0};
fprintf(stderr, "sender start\n");
while(!stop)
{
sendto(socket_fd, packet, PACKET_SIZE, 0, (const struct sockaddr *) servaddr, len);
usleep((useconds_t)TIME_WAIT_US);
count_s++;
if(count_s%DISPLAY_COUNT==0)
{
fprintf(stderr, "----------\n\tcount send : %ld\n\tcount receive : %ld\n\tcount process (approx) : %ld\n----------\n",
count_s, count_r, count_arrray(counts, NB_THREADS));
}
}
return NULL;
}
//process thread
void * process(void * data)
{
static int id = -1;
id++;
LinkedList * list = (LinkedList *)data;
fprintf(stderr, "process %d start\n", id);
while(!stop)
{
char * packet = (char *)popList(list);
if(packet==NULL)
{
continue;
}
//heavy process
usleep(1000);
counts[id]++;
free(packet);
}
char * packet = (char *)popList(list);
while(packet!=NULL)
{
counts[id]++;
free(packet);
packet = (char *)popList(list);
}
return NULL;
}
//socket creation
int createSocket(struct sockaddr_in * servaddr)
{
const char * hostname = HOSTNAME;
int port = PORT;
struct hostent *hostinfo = gethostbyname(hostname); /* on récupère les informations de l'hôte auquel on veut se connecter */
if (hostinfo == NULL) /* l'hôte n'existe pas */
{
fprintf(stderr, "hostname fail\n");
exit(-1);
}
// Filling server information
memset(servaddr, 0, sizeof(servaddr));
servaddr->sin_family = hostinfo->h_addrtype;
servaddr->sin_addr = *(struct in_addr *) hostinfo->h_addr_list[0];
servaddr->sin_port = htons(port);
// Starting socket
int socket_fd = socket(hostinfo->h_addrtype, SOCK_DGRAM, 0);
if(socket_fd==-1)
{
fprintf(stderr, "socket fail\n");
exit(-1);
}
//set the socket reusable
int yes=1;
if(setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1)
{
fprintf(stderr, "socket set fail\n");
exit(-1);
}
// Bind the socket with the server address
if (bind(socket_fd, (const struct sockaddr *)servaddr, sizeof(*servaddr))==-1)
{
fprintf(stderr, "bind fail %d\n", errno);
exit(-1);
}
return socket_fd;
}
int main()
{
//create socket
struct sockaddr_in servaddr;
socket_fd = createSocket(&servaddr);
//start threads
LinkedList * list_array[NB_THREADS];
pthread_t threads[NB_THREADS];
for(int i=0; i<NB_THREADS; i++)
{
list_array[i] = createList();
counts[i] = 0;
if(pthread_create(&threads[i], NULL, process, list_array[i])!=0)
{
fprintf(stderr, "pthread create process %d fail\n", i);
exit(-1);
}
}
pthread_t thread_r, thread_s;
if (pthread_create(&thread_r, NULL, receiver, list_array)!=0)
{
fprintf(stderr, "pthread create receiver fail\n");
exit(-1);
}
if (pthread_create(&thread_s, NULL, sender, &servaddr)!=0)
{
fprintf(stderr, "pthread create sender fail\n");
exit(-1);
}
//wait user input to exit
char invoer;
printf("\n");
scanf("%c",&invoer);
fprintf(stderr, "ending...");
stop = 1;
sendto(socket_fd, NULL, 0, 0, (const struct sockaddr *)&servaddr, sizeof(servaddr));
pthread_join(thread_r, NULL);
pthread_cancel(thread_s);
pthread_join(thread_s, NULL);
for(int i=0; i<NB_THREADS; i++)
{
pthread_join(threads[i], NULL);
free(list_array[i]);
}
return 0;
}