TCP Server работники с kqueue
Недавно я провел некоторое тестирование с событиями ядра и придумал следующее:
Имеет ли смысл использовать событие ядра для принятия сокетов? Мое тестирование показало, что я смог обработать только один прием за один раз (даже если массив списка событий больше)(мне кажется, что.ident == sockfd имеет значение true только для одного сокета).
Я думал, что использование kevent в основном для чтения из нескольких сокетов одновременно. Это правда?
Это как TCP-сервер делается с реализацией kqueue?:
- Прослушивание темы (без kqueue)
- Принимает новые соединения и добавляет FD к рабочему kqueue.ВОПРОС: Это вообще возможно? Мое тестирование показало да, но гарантируется ли, что рабочий поток будет знать об изменениях и действительно ли Kevent действительно безопасен для потоков?
Рабочая нить (с kqueue)
- Ожидает чтения файловых дескрипторов, добавленных из потока прослушивания.
ВОПРОС: Сколько сокетов одновременно имеет смысл проверять наличие обновлений?
Спасибо
2 ответа
Это не совсем ответ, но я сделал небольшой серверный скрипт с kqueue
объясняя проблему:
#include <stdio.h> // fprintf
#include <sys/event.h> // kqueue
#include <netdb.h> // addrinfo
#include <arpa/inet.h> // AF_INET
#include <sys/socket.h> // socket
#include <assert.h> // assert
#include <string.h> // bzero
#include <stdbool.h> // bool
#include <unistd.h> // close
int main(int argc, const char * argv[])
{
/* Initialize server socket */
struct addrinfo hints, *res;
int sockfd;
bzero(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
assert(getaddrinfo("localhost", "9090", &hints, &res) == 0);
sockfd = socket(AF_INET, SOCK_STREAM, res->ai_protocol);
assert(sockfd > 0);
{
unsigned opt = 1;
assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0);
#ifdef SO_REUSEPORT
assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0);
#endif
}
assert(bind(sockfd, res->ai_addr, res->ai_addrlen) == 0);
freeaddrinfo(res);
/* Start to listen */
(void)listen(sockfd, 5);
{
/* kevent set */
struct kevent kevSet;
/* events */
struct kevent events[20];
/* nevents */
unsigned nevents;
/* kq */
int kq;
/* buffer */
char buf[20];
/* length */
ssize_t readlen;
kevSet.data = 5; // backlog is set to 5
kevSet.fflags = 0;
kevSet.filter = EVFILT_READ;
kevSet.flags = EV_ADD;
kevSet.ident = sockfd;
kevSet.udata = NULL;
assert((kq = kqueue()) > 0);
/* Update kqueue */
assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);
/* Enter loop */
while (true) {
/* Wait for events to happen */
nevents = kevent(kq, NULL, 0, events, 20, NULL);
assert(nevents >= 0);
fprintf(stderr, "Got %u events to handle...\n", nevents);
for (unsigned i = 0; i < nevents; ++i) {
struct kevent event = events[i];
int clientfd = (int)event.ident;
/* Handle disconnect */
if (event.flags & EV_EOF) {
/* Simply close socket */
close(clientfd);
fprintf(stderr, "A client has left the server...\n");
} else if (clientfd == sockfd) {
int nclientfd = accept(sockfd, NULL, NULL);
assert(nclientfd > 0);
/* Add to event list */
kevSet.data = 0;
kevSet.fflags = 0;
kevSet.filter = EVFILT_READ;
kevSet.flags = EV_ADD;
kevSet.ident = nclientfd;
kevSet.udata = NULL;
assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);
fprintf(stderr, "A new client connected to the server...\n");
(void)write(nclientfd, "Welcome to this server!\n", 24);
} else if (event.flags & EVFILT_READ) {
/* sleep for "processing" time */
readlen = read(clientfd, buf, sizeof(buf));
buf[readlen - 1] = 0;
fprintf(stderr, "bytes %zu are available to read... %s \n", (size_t)event.data, buf);
sleep(4);
} else {
fprintf(stderr, "unknown event: %8.8X\n", event.flags);
}
}
}
}
return 0;
}
Каждый раз, когда клиент отправляет что-то, сервер испытывает "задержку" в 4 секунды. (Я немного утрировал, но для тестирования вполне разумно). Так как же обойти эту проблему? Я вижу рабочие потоки (пул) со своими kqueue
как возможное решение, тогда не будет никакой задержки соединения. (каждый рабочий поток читает определенный "диапазон" файловых дескрипторов)
Обычно вы используете kqueue в качестве альтернативы потокам. Если вы собираетесь использовать потоки, вы можете просто настроить прослушивающий поток и рабочий поток потоков с одним потоком для каждого принятого соединения. Это гораздо более простая модель программирования.
В управляемой событиями инфраструктуре вы помещаете и прослушивающий сокет, и все принятые сокеты в kqueue, а затем обрабатывает события по мере их возникновения. Когда вы принимаете сокет, вы добавляете его в kqueue, а когда обработчик сокета завершает его работу, он может удалить сокет из kqueue. (Последнее обычно не требуется, потому что закрытие fd автоматически удаляет любые связанные события из любого kqueue.)
Обратите внимание, что каждое событие, зарегистрированное в kqueue, имеет void*
userdata, которые могут быть использованы для определения желаемого действия при возникновении события. Поэтому не обязательно, чтобы в каждой очереди событий был уникальный обработчик событий; на самом деле, часто встречаются разные обработчики. (Например, вы также можете обрабатывать канал управления, настроенный через именованный канал.)
Гибридные модели событий / потоков, безусловно, возможны; в противном случае вы не сможете воспользоваться преимуществами многоядерных процессоров. Одна из возможных стратегий - использовать очередь событий в качестве диспетчера в модели производитель-потребитель. Обработчик очереди будет непосредственно обрабатывать события на слушающем сокете, принимая соединение и добавляя принятый fd в очередь событий. Когда происходит событие соединения с клиентом, это событие будет помещено в рабочую очередь для последующей обработки. Также возможно иметь несколько рабочих очередей, по одной на поток, и иметь предполагаемое предположение, в какую рабочую очередь должно быть помещено новое соединение, предположительно на основе текущей нагрузки этого потока.