Эполл: потерять некоторые события EPOLLOUT?
Вот так выглядит мой сервер:
-WorkerThread (ы):
- вызывает epoll_wait, принимает соединения, устанавливает неблокирование fd (EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP)
- вызывает recv до EAGAIN для события EPOLLIN и помещает все данные в глобальный RecvBuffer(синхронизируется pthread_mutex)
- по событию EPOLLOUT: обращается к глобальному SendBuffer и, если есть данные, которые нужно отправить для текущего готового fd, сделайте это (в цикле while до EAGAIN или до тех пор, пока не будут отправлены все данные; когда весь пакет отправлен, извлеките его из SendBuffer)
-IOThread (ы)
- берет данные из глобального RecvBuffer, обрабатывает их
- отправляет ответ, сначала пытаясь позвонить отправить сразу. Если не все данные отправлены, отправьте оставшуюся часть в глобальный SendBuffer для отправки из WorkerThread)
Проблема в том, что сервер не отправляет все данные из очереди (они остаются в SendBuffer), и количество "не отправленных" данных увеличивается с увеличением числа клиентов. Ради тестирования я использую только 1 рабочий поток и 1 iothread, но, кажется, это не имеет никакого значения, если я использую больше. Доступ к глобальным буферам защищен с помощью pthread_mutex. Кроме того, мой размер данных ответа составляет 130 тыс. Байт (для отправки такого количества данных требуется не менее 3-х вызовов). С другой стороны - Windows-клиент, использующий блокирующие сокеты.
Большое спасибо! MJ
редактировать:
Да, по умолчанию я жду событий EPOLLOUT, даже если мне нечего отправить. Для простоты реализации и руководства man-страницы я сделал это следующим образом. Кроме того, мое понимание этого было так:
Даже если я "пропускаю" событие EPOLLOUT в то время, когда я не хочу отправлять что-либо, это не проблема, потому что, когда я хочу отправить данные, я буду вызывать send до тех пор, пока в будущем не будут запущены EAGAIN и EPOLLOUT (и это происходит в большинстве случаев)
Теперь я изменил код для переключения между событиями IN/OUT:
Примите:
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);
когда все данные были отправлены:
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);
когда я достигну EAGAIN, позвонив send in IOThread:
event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);
..и я получаю такое же поведение. Также я попытался удалить флаг EPOLLET и ничего не изменилось
Односторонний вопрос: заменяет ли epoll_ctl с флагом EPOLL_CTL_MOD член события или просто ИЛИ с данным аргументом?
EDIT3: обновлена функция IOThread для непрерывной отправки до тех пор, пока не будут отправлены все данные, или до EAGAIN. Я также пытался отправить, даже если я отправил все данные, но большую часть времени я получал errno 88 Socket операция на не-сокете
РЕДАКТИРОВАТЬ 4: я исправил некоторые ошибки в моем "коде отправки", поэтому я не получаю никаких данных в очереди, не отправленных сейчас.. Но я не получаю столько данных, сколько должен был:)) Наибольшее количество "пропущенных" (не полученных) данных я получить, когда клиент вызывает recv сразу, когда отправка завершена, и она увеличивается с ростом числа клиентов. Когда я помещаю 2-секундную задержку между отправкой и вызовом recv на клиенте (блокировка вызовов), я теряю практически все данные на сервере, в зависимости от того, сколько клиентов работает (тестовый код клиента включает простой цикл for с 1 отправкой и 1 вызовом recv после него) Опять же, попробовал с и без режима ET. Ниже обновлена функция WorkerThread, которая отвечает за получение данных. @Admins/Mods Может быть, я должен открыть новую тему сейчас, поскольку проблема немного другая?
void CNetServer::WorkerThread(void* param)
{
CNetServer* pNetServer =(CNetServer*)param;
struct epoll_event event;
struct epoll_event *events;
int s = 0;
// events = (epoll_event*)calloc (MAXEVENTS, sizeof event);
while (1)
{
int n, i;
// printf ("BLOCKING NOW! epoll_wait thread %d\n",pthread_self());
n = pNetServer->m_epollCtrl.Wait(-1);
// printf ("epoll_wait thread %d\n",pthread_self());
pthread_mutex_lock(&g_mtx_WorkerThd);
for (i = 0; i < n; i++)
{
if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
{
// An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)
// g_SendBufferArray.RemoveAll( 0 );
char szFileName[30] = {0};
sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
remove(szFileName);
/* printf( "\n\n\n");
printf( "\tDATA LEFT COUNT:%d\n",g_SendBufferArray.size());
for (int k=0;k<g_SendBufferArray.size();k++)
printf( "\tSD: %d DATA LEFT:%d\n",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
*/
// fprintf (stderr, "epoll error\n");
// fflush(stdout);
close (pNetServer->m_epollCtrl.Event(i)->data.fd);
continue;
}
else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
{
// We have a notification on the listening socket, which means one or more incoming connections.
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
// We have processed all incoming connections.
break;
}
else
{
perror ("accept");
break;
}
}
s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s) thread %d\n", infd, hbuf, sbuf,pthread_self());
}
// Make the incoming socket non-blocking and add it to the list of fds to monitor.
CEpollCtrl::SetNonBlock(infd,true);
if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
{
pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
}
if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
{
printf("EPOLLIN TRIGGERED FOR SD: %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
// We have data on the fd waiting to be read.
int done = 0;
ssize_t count = 0;
char buf[512];
while (1)
{
count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
printf("recv sd %d size %d thread %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
if (count == -1)
{
// If errno == EAGAIN, that means we have read all data. So go back to the main loop.
if ( errno != EAGAIN )
{
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
//connection is closed by peer.. do a cleanup and close
done = 1;
break;
}
else if (count > 0)
{
static int nDataCounter = 0;
nDataCounter+=count;
printf("RECVDDDDD %d\n",nDataCounter);
CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
}
}
if (done)
{
printf ("Closed connection on descriptor %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
// Closing the descriptor will make epoll remove it from the set of descriptors which are monitored.
close (pNetServer->m_epollCtrl.Event(i)->data.fd);
}
}
}
//
pNetServer->IOThread( (void*)pNetServer );
pthread_mutex_unlock(&g_mtx_WorkerThd);
}
}
void CNetServer::IOThread(void* param)
{
BYTEARRAY* pbPacket = new BYTEARRAY;
int fd;
struct epoll_event event;
CNetServer* pNetServer =(CNetServer*)param;
printf("IOThread startin' !\n");
for (;;)
{
bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );
if( bGotIt )
{
//process packet here
printf("Got 'em packet yo !\n");
BYTE* re = new BYTE[128000];
memset((void*)re,0xCC,128000);
buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;
pthread_mutex_lock(&g_mtx_WorkerThd);
while( 1 )
{
int s;
int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
printf ("IOT: Trying to send nSent: %d buffsize: %d \n",nSent,responsebuff->nSize - responsebuff->nBytesSent);
if (nSent == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK )
{
g_vSendBufferArray.push_back( *responsebuff );
printf ("IOT: now waiting for EPOLLOUT\n");
event.data.fd = fd;
event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
break;
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
else
{
printf( "%d\n",errno );
perror ("send");
break;
}
printf ("IOT: WOOOOT\n");
break;
}
else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
{
printf ("IOT:all is sent! wOOhOO\n");
responsebuff->sd = 0;
responsebuff->nBytesSent += nSent;
delete responsebuff;
break;
}
else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
{
printf ("IOT: partial send!\n");
responsebuff->nBytesSent += nSent;
}
}
delete [] re;
pthread_mutex_unlock(&g_mtx_WorkerThd);
}
}
}
1 ответ
Прекратите использовать EPOLLET. Это почти невозможно получить право.
Не спрашивайте о событиях EPOLLOUT, если вам нечего отправить.
Если у вас есть данные для отправки по соединению, следуйте этой логике:
A) Если в вашей очереди отправки уже есть данные для этого соединения, просто добавьте новые данные. Вы сделали
Б) Попробуйте отправить данные немедленно. Если вы отправите все это, все готово.
C) Сохраните оставшиеся данные в очереди отправки для этого соединения. Теперь попросите EPOLLOUT для этого соединения.