Как реализовать tcp с нулевым копированием, используя циклический буфер без блокировки в C++
У меня есть несколько потоков, которые должны потреблять данные из потока TCP. Я хочу использовать кольцевой буфер / очередь в общей памяти для чтения из сокета TCP. Прием TCP будет записывать прямо в кольцевую очередь. Потребители будут читать из очереди.
Эта конструкция должна включать нулевое копирование и нулевую блокировку. Однако здесь есть 2 разных вопроса.
Возможно ли / эффективно читать только одно логическое сообщение из сокета TCP? Если нет, и я прочитал более 1 сообщения, мне придется скопировать остатки из этого в это -> далее.
Реально ли реализовать очередь без блокировки? Я знаю, что есть атомные операции, но они тоже могут быть дорогостоящими. потому что весь кэш процессора должен быть признан недействительным. Это повлияет на все операции на всех моих 24 ядрах.
Я немного заржавел в низкоуровневом TCP, и не совсем ясно, как определить, когда сообщение завершено. Я ищу \0 или это конкретная реализация?
ти
2 ответа
К сожалению, TCP не может передавать сообщения, только байтовые потоки. Если вы хотите передавать сообщения, вам придется применить протокол сверху. Наилучшими протоколами для высокой производительности являются те, которые используют проверяемый на разумность заголовок, указывающий длину сообщения - это позволяет вам считывать правильное количество данных непосредственно в подходящий буферный объект без итерации побайтовых данных в поисках конечного символ сообщения. Затем буфер POINTER может быть поставлен в очередь в другой поток, и для следующего сообщения будет создан / удален новый буферный объект. Это позволяет избежать копирования больших объемов данных и является достаточно эффективным для больших сообщений, так что использование неблокирующей очереди для указателей на объект сообщения несколько бессмысленно.
Следующим доступным вариантом оптимизации является объединение буферов объектов *, чтобы избежать постоянного нового / удаления, повторного использования буферов * в потоке-потребителе для повторного использования в потоке-получателе сети. Это довольно легко сделать с помощью ConcurrentQueue, предпочтительно блокируя, чтобы разрешить управление потоком вместо повреждения данных или segfaults/AV, если пул временно очищается.
Затем добавьте [размер кэширования] 'dead-zone' в начале каждого * элемента данных буфера, чтобы ни один поток не мог ложно делиться данными с любым другим.
Результатом должно стать широкополосное поступление полных сообщений в поток потребителя с минимальной задержкой, потерей ресурсов ЦП или перегрузкой кеша. Все ваши 24 ядра могут работать на разных данных.
Копирование больших объемов данных в многопоточных приложениях - признание плохого дизайна и поражения.
Следовать за..
Похоже, вы застряли в итерации данных из-за разных протоколов:(
Буферный объект PDU без ложного разделения, пример:
typedef struct{
char deadZone[256]; // anti-false-sharing
int dataLen;
char data[8388608]; // 8 meg of data
} SbufferData;
class TdataBuffer: public{
private:
TbufferPool *myPool; // reference to pool used, in case more than one
EpduState PDUstate; // enum state variable used to decode protocol
protected:
SbufferData netData;
public:
virtual reInit(); // zeros dataLen, resets PDUstate etc. - call when depooling a buffer
virtual int loadPDU(char *fromHere,int len); // loads protocol unit
release(); // pushes 'this' back onto 'myPool'
};
loadPDU получает указатель на длину необработанных сетевых данных. Он возвращает либо 0 - означает, что он еще не полностью собрал PDU, либо количество байтов, которые он съел из необработанных сетевых данных, чтобы полностью собрать PDU, в этом случае поставьте его в очередь, отмените еще один и вызовите loadPDU() с неиспользованным остатком необработанных данных, затем перейдите к следующим необработанным данным.
При необходимости вы можете использовать разные пулы разных производных буферных классов для обслуживания разных протоколов - массив TbufferPool[Eprotocols]. TbufferPool может быть просто очередью BlockingCollection. Управление становится почти тривиальным - буферы могут быть отправлены в очереди по всей вашей системе, в графический интерфейс для отображения статистики, а затем, возможно, в логгер, если в конце цепочки очередей что-то вызывает release().
Очевидно, что "реальный" объект PDU будет иметь больше методов, объединений / структур данных, может быть, итераторов и механизма состояний для работы с протоколом, но в любом случае это основная идея. Главное - это простое управление, инкапсуляция и, поскольку никакие два потока не могут работать с одним экземпляром буфера, не требуется блокировка / синхронизация для анализа / доступа к данным.
О, да, и поскольку ни одна очередь не должна оставаться заблокированной дольше, чем требуется для нажатия / выталкивания одного указателя, шансы фактического конфликта очень малы - даже обычным очередям блокировки вряд ли когда-либо понадобится использовать блокировку ядра.
Если вы используете Windows 8 или Windows Server 2012, можно использовать зарегистрированный ввод-вывод, который обеспечивает более высокую пропускную способность при более низкой загрузке процессора, чем обычный IOCP; он делает это, вырезая переходы ядра, нулевую копию, среди прочего
API: http://msdn.microsoft.com/en-us/library/windows/desktop/ms740642%28v=vs.85%29.aspx
Справочная информация: http://www.serverframework.com/asynchronousevents/rio/