Можно ли использовать транспорт ZeroMQ (NetMQ) TCP между издателем и подписчиком в одном процессе?
Я думаю, что вопрос говорит обо всем на самом деле.
Для некоторой предыстории у меня есть подписчик, для которого я пытаюсь написать несколько тестов. Чтобы сделать это, я раскручиваю издателя из теста, указывая tcp://localhost:[port]
как адрес. Когда сообщение отправлено, подписчик не получает его. Вот пример кода для демонстрации:
string address = "tcp://localhost:1026";
// string address = "inproc://localhost1026";
var pubSocket = new PublisherSocket();
pubSocket.Bind(address);
var subSocket = new SubscriberSocket();
subSocket.Connect(address);
subSocket.SubscribeToAnyTopic();
pubSocket.SendFrame("Hello world!", false);
Console.WriteLine(subSocket.ReceiveFrameString()); /* <-- tcp transport
waits here forever
*/
subSocket.Dispose();
pubSocket.Dispose();
Если я изменю протокол на inproc://
тогда все хорошо. Я не хочу делать это в своих тестах, потому что я также хочу протестировать сокет монитора, и это не вызывает события для inproc://
связи (насколько я вижу).
Обратите внимание, что я использую NetMQ из кода C# (работает под.NET Framework 4.6.2).
2 ответа
О / П заслужил +1
для размещения номеров версий.NET используется. Отлично, сэр.
Можно ли использовать транспорт ZeroMQ (NetMQ) TCP между издателем и подписчиком в одном процессе?
Абсолютно,
пока нет никаких ограничений, препятствующих этому.,,
Ваше наблюдение связано с задержкой скрытой обработки, которая происходит внутри основного движка. . . внутри ZeroMQ Context()
пример.
Вещи не происходят в нулевое время, извините, не (даже (почти) особые магии, такие как Большой Взрыв, действительно заняли некоторое время, чтобы создать экземпляр нашей подлинной Вселенной. . . лучше прочитать полностью детективную книгу об этом, как опубликовано Стивен ВЕЙНБЕРГ: ПЕРВЫЕ ТРИ МИНУТЫ, Современный взгляд на происхождение Вселенной - обязательно нужно полюбить.
Что ж, можно отложить это сразу после Pieter HINTJENS, " Код подключен, том 1 " на ZeroMQ Zen-of-Zero. В этом оба смысла, много смысла не только здесь).
Итак, пока inproc://
транспортный класс имеет (почти) нулевые ресурсы, являясь чистой частной абстракцией, отображаемой в памяти (в процессе) (конечно, за исключением нескольких [ns]
- "устройства", такие как семафор / блокировка), инфраструктура ZeroMQ запускается и работает в "немедленном" режиме, tcp://
не имеет такого удобства, так как сначала должен сгенерировать все контракты, специфичные для транспортного класса, с O/S, с драйвером (ами) устройства, создать экземпляр ISO/OSI-{ L0, L1, L3, L3+ }, специфичный для транспортного класса -процессинг политики, создать соответствующий код накачки данных в Context()
RTO-состояние, выделение и отображение буфера (-ов) памяти для этих целей, так что довольно много работы предстоит сделать, прежде чем PUB
сторона переходит в состояние RTO, где она (в соответствии с более новыми версиями ~ API 4.+) также несет ответственность как за получение, так и за обработку подписки на услугу телеметрии, поскольку несет основную ответственность за SUB
-обработка TOPIC-filterlist.
Вот почему это привело к зависанию .recv( ..., ZMQ_BLOCK )
похоронен внутри завернутой абстракции NetMQ subSocket.ReceiveFrameString()
,
Чтобы проверить это, просто сделайте измененный тест:
// --------------------------------------------------- // DEMO PSEUDO-CODE
string rF = "";
while True:
pubSocket.SendFrame( "Hello world!", false ); // keep sending ...
// also may count++
// so as to "show" how
// many loops it took
rF = subSocket.ReceiveFrameString( false ); // non-blocking mode ~
// .recv( ZMQ_NOBLOCK )
// or may use Poll()
// to just sniff for
// a message presence
if rF == "":
continue; // ---^ LOOP NEXT, AS DID .recv() NOTHING YET
break; // ----------v BREAK AS DID .recv() MESSAGE
// ----------------------------------------------------------------------
Console.WriteLine( rF );
Здесь можно приложить больше усилий и поэкспериментировать, чтобы увидеть ключевые роли обоих .connect()
связанные с этим накладные расходы, а также необходимость не пропустить SUB
со стороны сигнализировала телеметрия о настройке подписки (подписей) + необходимость ее получения + повторная обработка на PUB
с другой стороны, прежде чем какое-либо сообщение будет отправлено в первую очередь к предполагаемому, иначе просто " навсегда ", SUB
"Толстый"- достаточно .sleep( someGuestimateTIME )
предложенный уже Hesam Faridmehr, после SUB
сторона имеет оба .connect()
плюс это .setsockopt( ZMQ_SUBSCRIBE )
(который должен быть сначала доставлен, а также должным образом обработан на PUB
со стороны (для правильной настройки процессора списка процессов TOPIC-фильтра), чего вполне достаточно перед первым PUB.send()
замаскирует основную причину, сделав ее "косвенно" заблокированной, и выполнение кода останавливается, вместо того чтобы сделать решение достаточно умным - используя неблокирующую форму Poll()
например - для профессионального передового опыта проектирования распределенных систем, где можно действительно повиноваться сборке хакеров, любимая первая строка настоящего макроса #ASSUME NOTHING;
,
Так как pub/sub
картина как радио. Издатель не будет ждать, пока подписчик подключится, он будет игнорировать отправку, если подписчик отсутствует. Вы можете проверить это, просто добавив Thread.Sleep(1000);
после subSocket.SubscribeToAnyTopic();
линии, и вы увидите, что получите сообщение.
В inproc
время связывания меньше tcp
вот почему вы получаете сообщение
И в inproc,
издатель должен быть включен до того, как подписчик подключится