Есть ли в ZeroMQ уведомление / событие обратного вызова / сообщение о поступлении данных?
Я пытаюсь интегрировать ZMQ в существующее приложение Windows, которое сильно зависит от сокетов MFC (CASyncSocket).
У меня есть поток пользовательского интерфейса CWinThread (без графического интерфейса), который асинхронно взаимодействует с сервером с помощью CAsyncSocket. Я хотел бы добавить линию связи ZMQ inproc для обработки передачи данных, полученных от сервера (на основе REQ/REP), в другие потоки в приложении.
Используя CAsyncSocket, метод OnReceive вызывается платформой MFC всякий раз, когда в сокете будут получены новые данные, которые будут получены (что может быть чрезмерным упрощением для хардкорных гуру MFC).
Есть ли такой механизм в ZMQ? Или мне нужно добавить дополнительный выделенный WorkerThread, который запускает поток пользовательского интерфейса для обработки моих сообщений ZMQ с остальной частью приложения? Трафик на обоих конвейерах минимален, поэтому я действительно не хочу создавать 2 отдельных потока, если я смогу обойтись с 1.
Обратите внимание, у меня работают основы, у меня просто проблемы с синхронизацией. Если я использую блокировку recv/send с ZMQ, это приводит к истощению моего CAsycSocket, потому что сообщения Windows никогда не обрабатываются потоком, что иногда приводит к тому, что они никогда не получают данные с сервера, который ZMQ должен доставлять. Но если я использую неблокирующие вызовы ZMQ, то поток часто заканчивается бездействием, потому что он не знает, считывать ли данные с сокета ZMQ.
3 ответа
В конечном счете, ответ - нет. В настоящее время нет обратных вызовов / уведомлений о поступлении данных в ZeroMQ, на которые вы можете ссылаться. Я также не смог найти ни одного форка, который добавляет эту функциональность.
Мне не удалось заставить ZMQ работать при использовании традиционных вызовов OnReceive, предоставляемых каркасом сокета MFC в одном потоке, и добавление 2-го потока для выделения ZMQ устранило всю цель его использования (оно используется для синхронизации потоков).
Моя работающая реализация закончилась тем, что я отбросил сокеты MFC и использовал ZMQ как для моего сервера inproc (для связи с другим потоком), так и для моего соединения с сервером TCP (не-ZMQ), а также использовал блокирующий вызов опроса (zmq_poll()) в OnIdle() метод (возвращает 1 каждый раз, чтобы создать занятый цикл). Блокирующий опрос
BOOL CMyThreaClass::OnIdle(LONG lCount)
{
UNREFERENCED_PARAMETER(lCount);
zmq_pollitem_t items [] = {
{ m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
{ m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
};
const int iZMQInfiniteTimeout(-1);
iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
TRACE("zmq_poll result: %d\n", iResult);
if (items[0].revents & ZMQ_POLLIN)
{
sMyStruct sMessage;
iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
TRACE("inproc recv result: %d\n", iResult);
// Process inproc messages
iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
TRACE("inproc send result: %d\n", iResult);
}
if (items[1].revents & ZMQ_POLLIN)
{
// there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
uint8_t id [256];
size_t id_size = 256;
iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
TRACE("getsockopt poll result %d:id %d\n", iResult, id);
iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
// now get our actual data
char szBuffer[1024];
int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
if (iBytesReceived > 0)
{
// process TCP data
}
}
}
Примечание. Для этого ответа необходимо использовать ZMQ 4 или более позднюю версию, поскольку более ранние версии ZMQ не будут взаимодействовать с обычным соединением через сокет TCP.
Вы могли бы позвонить zmq_recv()
с ZMQ_NOBLOCK
пометьте внутри основного цикла сообщений вашего приложения, переопределив ваш CWinApp::OnIdle()
, zmq_recv
вернется немедленно, если на сокете нет данных, ожидающих Если есть данные, обработайте их, но имейте в виду, что если вы сделаете что-то медленное, вы заставите приложение перестать отвечать на запросы.
Изменить: я не понял, что OnIdle
вызывается только один раз, когда очередь сообщений становится пустой. Но в соответствии с документацией MSDN вы можете вернуть ненулевое значение, чтобы вызывать его всегда:
- Если цикл сообщений проверяет очередь сообщений и не находит ожидающих сообщений, он вызывает
OnIdle
и поставки0
какlCount
аргумент. OnIdle
выполняет некоторую обработку и возвращает ненулевое значение, чтобы указать, что оно должно быть вызвано снова для дальнейшей обработки.- Цикл сообщений снова проверяет очередь сообщений. Если нет ожидающих сообщений, он вызывает
OnIdle
снова, увеличиваяlCount
аргумент. - В конце концов,
OnIdle
заканчивает обработку всех своих пустых задач и возвращает0
, Это говорит циклу сообщений прекратить вызовOnIdle
пока следующее сообщение не будет получено из очереди сообщений, после чего цикл простоя перезапускается с аргументом, установленным в0
,
Я также нашел эту ветку на GameDev.net, где пользователь сказал:
Все инструменты, которые я когда-либо писал в MFC, которые используют D3D, используют функцию OnIdle():
BOOL CD3DEditorApp::OnIdle(LONG lCount)
{
// Call base class first
CWinApp:OnIdle( lCount );
// game stuff...
// Always return true - this asks the framework to constantly
// call the Idle function when it isn't busy doing something
// else.
return TRUE;
}
Так что, по крайней мере, по словам одного человека, это обычная техника.
Вы можете передавать сигналы от потока к потоку с помощью пользовательских сообщений Windows. Вот пользовательское сообщение:
#define WM_MY_MESSAGE (WM_APP + 1)
Чтобы отправить его потоку с окном, используйте PostMessage или SendMessage для HWND. Добавьте его к карте сообщений окна с помощью ON_MESSAGE.
Чтобы отправить его в производный поток CWinThread без окон, используйте PostThreadMessage и получите его с ON_THREAD_MESSAGE.