Есть ли в ZeroMQ уведомление / событие обратного вызова / сообщение о поступлении данных?

Я пытаюсь интегрировать ZMQ в существующее приложение Windows, которое сильно зависит от сокетов MFC (CASyncSocket).

У меня есть поток пользовательского интерфейса CWinThread (без графического интерфейса), который асинхронно взаимодействует с сервером с помощью CAsyncSocket. Я хотел бы добавить линию связи ZMQ inproc для обработки передачи данных, полученных от сервера (на основе REQ/REP), в другие потоки в приложении.

Используя CAsyncSocket, метод OnReceive вызывается платформой MFC всякий раз, когда в сокете будут получены новые данные, которые будут получены (что может быть чрезмерным упрощением для хардкорных гуру MFC).

Есть ли такой механизм в ZMQ? Или мне нужно добавить дополнительный выделенный WorkerThread, который запускает поток пользовательского интерфейса для обработки моих сообщений ZMQ с остальной частью приложения? Трафик на обоих конвейерах минимален, поэтому я действительно не хочу создавать 2 отдельных потока, если я смогу обойтись с 1.

Обратите внимание, у меня работают основы, у меня просто проблемы с синхронизацией. Если я использую блокировку recv/send с ZMQ, это приводит к истощению моего CAsycSocket, потому что сообщения Windows никогда не обрабатываются потоком, что иногда приводит к тому, что они никогда не получают данные с сервера, который ZMQ должен доставлять. Но если я использую неблокирующие вызовы ZMQ, то поток часто заканчивается бездействием, потому что он не знает, считывать ли данные с сокета ZMQ.

3 ответа

Решение

В конечном счете, ответ - нет. В настоящее время нет обратных вызовов / уведомлений о поступлении данных в ZeroMQ, на которые вы можете ссылаться. Я также не смог найти ни одного форка, который добавляет эту функциональность.

Мне не удалось заставить ZMQ работать при использовании традиционных вызовов OnReceive, предоставляемых каркасом сокета MFC в одном потоке, и добавление 2-го потока для выделения ZMQ устранило всю цель его использования (оно используется для синхронизации потоков).

Моя работающая реализация закончилась тем, что я отбросил сокеты MFC и использовал ZMQ как для моего сервера inproc (для связи с другим потоком), так и для моего соединения с сервером TCP (не-ZMQ), а также использовал блокирующий вызов опроса (zmq_poll()) в OnIdle() метод (возвращает 1 каждый раз, чтобы создать занятый цикл). Блокирующий опрос

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d\n", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d\n", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d\n", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d\n", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

Примечание. Для этого ответа необходимо использовать ZMQ 4 или более позднюю версию, поскольку более ранние версии ZMQ не будут взаимодействовать с обычным соединением через сокет TCP.

Вы могли бы позвонить zmq_recv() с ZMQ_NOBLOCK пометьте внутри основного цикла сообщений вашего приложения, переопределив ваш CWinApp::OnIdle(), zmq_recv вернется немедленно, если на сокете нет данных, ожидающих Если есть данные, обработайте их, но имейте в виду, что если вы сделаете что-то медленное, вы заставите приложение перестать отвечать на запросы.

Изменить: я не понял, что OnIdle вызывается только один раз, когда очередь сообщений становится пустой. Но в соответствии с документацией MSDN вы можете вернуть ненулевое значение, чтобы вызывать его всегда:

  1. Если цикл сообщений проверяет очередь сообщений и не находит ожидающих сообщений, он вызывает OnIdle и поставки 0 как lCount аргумент.
  2. OnIdle выполняет некоторую обработку и возвращает ненулевое значение, чтобы указать, что оно должно быть вызвано снова для дальнейшей обработки.
  3. Цикл сообщений снова проверяет очередь сообщений. Если нет ожидающих сообщений, он вызывает OnIdle снова, увеличивая lCount аргумент.
  4. В конце концов, OnIdle заканчивает обработку всех своих пустых задач и возвращает 0, Это говорит циклу сообщений прекратить вызов OnIdle пока следующее сообщение не будет получено из очереди сообщений, после чего цикл простоя перезапускается с аргументом, установленным в 0,

Я также нашел эту ветку на GameDev.net, где пользователь сказал:

Все инструменты, которые я когда-либо писал в MFC, которые используют D3D, используют функцию OnIdle():

BOOL CD3DEditorApp::OnIdle(LONG lCount) 
{
    // Call base class first
    CWinApp:OnIdle( lCount );

    // game stuff...

    // Always return true - this asks the framework to constantly
    // call the Idle function when it isn't busy doing something
    // else.
    return TRUE;
}

Так что, по крайней мере, по словам одного человека, это обычная техника.

Вы можете передавать сигналы от потока к потоку с помощью пользовательских сообщений Windows. Вот пользовательское сообщение:

#define WM_MY_MESSAGE (WM_APP + 1)

Чтобы отправить его потоку с окном, используйте PostMessage или SendMessage для HWND. Добавьте его к карте сообщений окна с помощью ON_MESSAGE.

Чтобы отправить его в производный поток CWinThread без окон, используйте PostThreadMessage и получите его с ON_THREAD_MESSAGE.

Другие вопросы по тегам