ZMQ Xpub/Xsub не отправляет сообщения о подписке
Я экспериментирую с ZMQ, используя XPUB/XSUB
прокси. В конечном итоге я хочу вставить некоторую логику в середину, поэтому я попытался создать свой собственный прокси с помощьюzmq_poll()
вместо использования встроенного zmq_proxy
. Однако мойXPUB
никогда не получает сообщений подписки для пересылки. Код ниже работает (подписчик печатает"helloWorld"
) если я использую встроенный прокси. Это не работает с моим настраиваемым прокси. Я пробовал установить очень подробные / ручные параметры сокета и добавить задержки, но, похоже, не могу заставить его работать:
Бывший пост [Комментарий]: после дальнейшего тестирования я понял, что это просто тупая копия и прошлый баг, и что аргументы в пользу
zmq::poll
были не то, что я думал. второй аргумент - это количество элементов в списке. что должно быть 2, а не 1. - CrimsonKnights 13 дек в 5:45
void proxyBuiltIn(zmq::context_t* context)
{
zmq::socket_t frontend(*context, ZMQ_XSUB);
zmq::socket_t backend(*context, ZMQ_XPUB);
frontend.bind("inproc://frontend");
backend.bind("inproc://backend");
zmq::proxy(frontend, backend);
}
void proxyCustom(zmq::context_t* context)
{
zmq::socket_t frontend(*context, ZMQ_XSUB);
zmq::socket_t backend(*context, ZMQ_XPUB);
frontend.bind("inproc://frontend");
backend.bind("inproc://backend");
zmq::pollitem_t items[2] =
{
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 },
};
while (1)
{
if (zmq::poll(items, 1, 1000) == -1){ break;}
if (items[0].revents & ZMQ_POLLIN)
{
std::cout << "got published message" << std::endl; // won't get here because subscription is not made
zmq::multipart_t message;
message.recv(frontend);
message.send(backend);
}
if (items[1].revents & ZMQ_POLLIN)
{
std::cout << "got subscription message" << std::endl; // never gets here.
zmq::multipart_t message;
message.recv(backend);
message.send(frontend);
}
}
}
void subscriber(zmq::context_t* context)
{
zmq::socket_t subscriber(*context, ZMQ_SUB);
subscriber.connect("inproc://backend");
std::string topic = "testTopic";
subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
while (1)
{
zmq::multipart_t incoming;
incoming.recv(subscriber);
std::string topic = incoming.popstr();
std::string data = incoming.popstr();
std::cout << topic.c_str() << ", " << data.c_str() << std::endl;
}
}
void publisher(zmq::context_t* context)
{
zmq::socket_t publisher(*context, ZMQ_PUB);
publisher.connect("inproc://frontend");
while (1)
{
zmq::multipart_t message;
message.addstr("testTopic");
message.addstr("helloWorld!");
message.send(publisher);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
int main(int argc, char **argv)
{
zmq::context_t* context = new zmq::context_t(1);
std::thread(proxyCustom, context).detach(); // this does not
// std::thread(proxyBuiltIn, context).detach(); // this works
std::thread( publisher, context).detach();
std::thread(subscriber, context).detach();
while(1)
{
}
}