Дилер ZeroMQ - высокая задержка по сравнению с winsock

Моя компания рассматривает возможность использования ZeroMQ в качестве транспортного механизма. Сначала я оценил производительность, чтобы понять, с чем я играю.

Поэтому я создал приложение, сравнивающее настройку zmq от дилера к winsock. Я измерил циклическую отправку синхронных сообщений от клиента на сервер и затем вычисление среднего значения.

Вот сервер под управлением winsock:

DWORD RunServerWINSOCKTest(DWORD dwPort)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        printf("WSAStartup failed with error: %d\n", iRet);
        return iRet;
    }

    struct addrinfo hints;
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    struct addrinfo *result = NULL;
    iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
    if (iRet != 0)
    {
        WSACleanup();
        return iRet;
    }

    SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
    if (ListenSocket == INVALID_SOCKET)
    {
        freeaddrinfo(result);
        WSACleanup();
        return WSAGetLastError();
    }

    iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
    if (iRet == SOCKET_ERROR)
    {
        freeaddrinfo(result);
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    freeaddrinfo(result);
    iRet = listen(ListenSocket, SOMAXCONN);
    if (iRet == SOCKET_ERROR)
    {
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    while (true)
    {
        SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
        if (ClientSocket == INVALID_SOCKET)
        {
            closesocket(ListenSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        char value = 0;
        setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));

        char recvbuf[DEFAULT_BUFLEN];
        int recvbuflen = DEFAULT_BUFLEN;
        do {

            iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
            if (iRet > 0) {
            // Echo the buffer back to the sender
                int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
                if (iSendResult == SOCKET_ERROR)
                {
                    closesocket(ClientSocket);
                    WSACleanup();
                    return WSAGetLastError();
                }
            }
            else if (iRet == 0)
                printf("Connection closing...\n");
            else  {
                closesocket(ClientSocket);
                WSACleanup();
                return 1;
            }

        } while (iRet > 0);

        iRet = shutdown(ClientSocket, SD_SEND);
        if (iRet == SOCKET_ERROR)
        {
            closesocket(ClientSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        closesocket(ClientSocket);
    }
    closesocket(ListenSocket);

    return WSACleanup();
}

Вот клиент под управлением winsock:

DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        return iRet;
    }

    SOCKET ConnectSocket = INVALID_SOCKET;
    struct addrinfo *result = NULL,  *ptr = NULL, hints;


    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
    if (iResult != 0) {
        WSACleanup();
        return 1;
    }

    for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
        ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
        if (ConnectSocket == INVALID_SOCKET) {
            WSACleanup();
            return 1;
        }

        iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            ConnectSocket = INVALID_SOCKET;
            continue;
        }
        break;
    }

    freeaddrinfo(result);

    if (ConnectSocket == INVALID_SOCKET) {
        WSACleanup();
        return 1;
    }


    // Statistics
    UINT64 uint64BytesTransmitted = 0;
    UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
    UINT64 uint64WaitForResponse = 0;

    DWORD dwMessageCount = 1000000;

    CHAR cRecvMsg[DEFAULT_BUFLEN];
    SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);

    std::string strSendMsg(dwMessageSize, 'X');

    for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
    {
        int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
        if (iRet == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        uint64BytesTransmitted += strSendMsg.size();

        UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
        if (iRet < 1)
        {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        std::string strMessage(cRecvMsg);

        if (strMessage.compare(strSendMsg) == 0)
        {
            uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
        }
        else
        {
            return NO_ERROR;
        }
}

    UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
    PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);

    iResult = shutdown(ConnectSocket, SD_SEND);
    if (iResult == SOCKET_ERROR) {
        closesocket(ConnectSocket);
        WSACleanup();
        return 1;
    }
    closesocket(ConnectSocket);
    return WSACleanup();
}

Вот сервер под управлением ZMQ (дилер)

DWORD RunServerZMQTest(DWORD dwPort)
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t server(context, ZMQ_DEALER);

        // Set options here
        std::string strIdentity = s_set_id(server);
        printf("Created server connection with ID: %s\n", strIdentity.c_str());

        std::string strConnect = "tcp://*:" + std::to_string(dwPort);
        server.bind(strConnect.c_str());

        bool bRunning = true;
        while (bRunning)
        {
            std::string strMessage = s_recv(server);

            if (!s_send(server, strMessage))
            {
                return NO_ERROR;
            }
        }
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

return NO_ERROR;

}

Вот клиент под управлением ZMQ (дилер)

DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    try
    {
        zmq::context_t ctx(1);
        zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ

        // Set options here
        std::string strIdentity = s_set_id(client);

        std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
        client.connect(strConnect.c_str());

        if(s_send(client, "INIT"))
        {
            std::string strMessage = s_recv(client);
            if (strMessage.compare("INIT") == 0)
            {
                printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
            }
            else
            {
                return NO_ERROR;
            }
        }
        else
        {
            return NO_ERROR;
        }


        // Statistics
        UINT64 uint64BytesTransmitted   = 0;
        UINT64 uint64StartTime          = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        UINT64 uint64WaitForResponse    = 0;

        DWORD dwMessageCount = 10000000;


        std::string strSendMsg(dwMessageSize, 'X');
        for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
        {
            if (s_send(client, strSendMsg))
            {
                uint64BytesTransmitted += strSendMsg.size();

                UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
                std::string strRecvMsg = s_recv(client);
                if (strRecvMsg.compare(strSendMsg) == 0)
                {
                    uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
                }
                else
                {
                    return NO_ERROR;
                }
            }
            else
            {
                return NO_ERROR;
            }
        }
        UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
        PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

    return NO_ERROR;
    }

Я запускаю тест локально с размером сообщения 5 байт, и я получаю следующий результат:

WINSOCK

Messages sent:                 1 000 000
Time elapsed (us):            48 019 415
Time elapsed (s):                     48.019 415
Message size (bytes):                  5
Msg/s:                            20 825
Bytes/s:                         104 125
Mb/s:                                  0.099
Total   response time (us):   24 537 376
Average repsonse time (us):           24.0

а также

ZeroMQ

Messages sent:                 1 000 000
Time elapsed (us):           158 290 708
Time elapsed (s):                    158.290 708    
Message size (bytes):                  5
Msg/s:                             6 317
Bytes/s:                          31 587
Mb/s:                                  0.030
Total   response time (us):  125 524 178    
Average response time (us):          125.0

Кто-нибудь может объяснить, почему среднее время отклика намного выше при использовании ZMQ?

Цель состоит в том, чтобы найти настройки, где я могу отправлять и получать сообщения асинхронно без необходимости отвечать. Если это может быть достигнуто с другой настройкой, чем у дилера-дилера, пожалуйста, дайте мне знать!

3 ответа

Решение

Это всего лишь ответ на небольшую часть вашего вопроса, но здесь идет -

Зачем вам нужен дилер / дилер? Я предполагаю, потому что общение может начаться с любой точки? Вы не привязаны к дилеру / дилеру, в частности, он ограничивает вас только двумя конечными точками, если вы когда-либо добавите другую конечную точку на любой стороне сообщения, скажем, второго клиента, то каждый клиент будет получать только половину сообщений, потому что дилер строго круговой

Что вам нужно для асинхронной связи, так это некоторая комбинация разъемов дилера и / или маршрутизатора. Ни один из них не требует ответа, основные различия заключаются в том, как они выбирают, с какого подключенного партнера отправлять сообщение:

  • Дилер, как сказано, строго круговой, он отправит каждому подключенному равноправному абоненту последовательно
  • Маршрутизатор является строго адресным сообщением, вам нужно знать "имя" однорангового узла, которому вы хотите отправить, чтобы получить сообщение там.

Эти два типа сокетов работают вместе, потому что сокеты дилера (и сокеты запроса, дилер является сокетом типа запроса) отправляют свое "имя" как часть сообщения, которое сокет маршрутизатора может использовать для отправки данных обратно. Это парадигма запрос / ответ, и вы увидите, что такая парадигма применяется во всех примерах в руководстве, но вы можете отогнуть эту парадигму до того, что вы ищете, в частности ни дилер, ни маршрутизатор не требуют ответа,

Не зная ваших полных требований, я не могу сказать вам, какую архитектуру ZMQ я бы выбрал, но в целом я предпочитаю расширяемость сокетов маршрутизатора, проще обрабатывать соответствующую адресацию, чем просто объединить все в один узел... вы увидите предупреждения против использования роутера / роутера, и я согласен с ними в той степени, в которой вы должны понимать, что вы делаете, прежде чем пытаться это сделать, но, понимая, что вы делаете, реализация не так сложна.


У вас также есть возможность, если это соответствует вашим требованиям, установить на каждом конце сокет паба, а на каждом - под сокет, если буквально никогда не было ответов. Если это строго поток данных от источника к цели, и ни одному из партнеров не нужна обратная связь о том, что он отправляет, то это, вероятно, лучший выбор, даже если это означает, что вы имеете дело с двумя сокетами на конец, а не с одним.


Ничто из этого напрямую не связано с производительностью, но важно понимать, что сокеты zmq оптимизированы для конкретных случаев использования, и, как указано в ответе Джона Джеффериса, вы нарушаете этот сценарий использования для сокета своего дилера, отправляя сообщения в ваш тест строго синхронный. Первое, с чего нужно начать, - завершить работу с архитектурой ZMQ, а затем смоделировать реальный поток сообщений, в частности, не добавляя произвольные ожидания и синхронность, что обязательно изменит внешний вид пропускной способности при ее тестировании, в значительной степени по определению.

Вы говорите, что хотите отправлять и получать сообщения асинхронно без необходимости отвечать. Тем не менее, все проведенные тесты являются полностью синхронными, по сути, запрос-ответ, но на розетке дилер-дилер. Что-то там не вычисляется. Почему бы не запустить тесты, которые более точно имитируют дизайн, к которому вы стремитесь?

ZeroMQ получает достаточное количество производительности "быстрее, чем TCP", объединяя сообщения в очереди в одно сообщение. Очевидно, что этот механизм не может быть активирован в чисто синхронном режиме с одним сообщением в полете за раз.

Что касается того, почему этот конкретный тест очень маленьких сообщений, отправляемых и принимаемых исключительно синхронно, является относительно медленным, я не могу сказать. Вы сделали профилирование? Я снова скажу, что проведение этого теста и принятие решений на его основе не имеет смысла, если он не похож на ваш окончательный дизайн.

Одна вещь, которая выглядит странно, это блок try/catch в коде ZeroMQ. Это не выглядит справедливым, потому что тест winsock не был написан таким образом. Известно, что в try/catch есть / было много накладных расходов.

Проблема с OP связана с пропускной способностью, а не с задержкой, и, вероятно, связана с шаблоном, который используется в приведенных примерах. Тем не менее, вы, вероятно, всегда найдете это ZeroMQ имеет большую задержку, что я объясню, хотя это может быть бесполезно для OP в этой ситуации.

ZeroMQ работает путем буферизации сообщений. Представьте себе (просто в качестве базовой иллюстрации) создание std::string и добавление к нему множества маленьких строк (многие тысячи, каждая из которых включает небольшой заголовок, чтобы узнать размер этих маленьких сегментов), а затем отправка этой большей строки с интервалами 100us, 1000us, 10ms или что угодно. На принимающей стороне большая строка принимается, и каждое меньшее сообщение удаляется по одному на основе размера заголовка, который отправляется вместе с ним. Это позволяет потенциально отправлять миллионы сообщений партиями (хотя std::string очевидно, это плохой выбор) без дополнительных затрат на отправку этих миллионов очень маленьких измерений по одному. В результате вы в полной мере используете свои сетевые ресурсы и увеличиваете пропускную способность, а также создаете базовые FIFO поведение. Однако вы также создаете задержку для заполнения буфера, что означает увеличение задержки.

Представьте себе (опять же, просто в качестве базовой иллюстрации): если вы потратите полсекунды (включая строковые операции и т. Д.) На буферизацию миллиона сообщений, это приведет к увеличению строки в несколько мегабайт. Современные сети могут легко отправить эту большую строку в оставшиеся полсекунды. 1000000us (1 секунда) / 1000000 сообщений будет 1us за сообщение, верно? Неверно - у всех сообщений была задержка в полсекунды, чтобы заполнить очередь, что привело к увеличению задержки до половины секунды для всех сообщений. ZeroMQ отправляет партии намного быстрее, чем каждый 500ms, но увеличение задержки, которое это иллюстрирует, все еще происходит в ZeroMQ хотя это обычно по линии нескольких ms,

Другие вопросы по тегам