Можно ли использовать транспорт ZeroMQ (NetMQ) TCP между издателем и подписчиком в одном процессе?

Я думаю, что вопрос говорит обо всем на самом деле.

Для некоторой предыстории у меня есть подписчик, для которого я пытаюсь написать несколько тестов. Чтобы сделать это, я раскручиваю издателя из теста, указывая tcp://localhost:[port] как адрес. Когда сообщение отправлено, подписчик не получает его. Вот пример кода для демонстрации:

   string address   =    "tcp://localhost:1026";
// string address   = "inproc://localhost1026";

   var    pubSocket =  new PublisherSocket();
   pubSocket.Bind(address);

   var subSocket = new SubscriberSocket();
   subSocket.Connect(address);
   subSocket.SubscribeToAnyTopic();

   pubSocket.SendFrame("Hello world!", false);

   Console.WriteLine(subSocket.ReceiveFrameString()); /* <-- tcp transport 
                                                         waits here forever
                                                         */
   subSocket.Dispose();
   pubSocket.Dispose();

Если я изменю протокол на inproc:// тогда все хорошо. Я не хочу делать это в своих тестах, потому что я также хочу протестировать сокет монитора, и это не вызывает события для inproc:// связи (насколько я вижу).

Обратите внимание, что я использую NetMQ из кода C# (работает под.NET Framework 4.6.2).

2 ответа

О / П заслужил +1для размещения номеров версий.NET используется. Отлично, сэр.

Можно ли использовать транспорт ZeroMQ (NetMQ) TCP между издателем и подписчиком в одном процессе?

Абсолютно,
пока нет никаких ограничений, препятствующих этому.,,

Ваше наблюдение связано с задержкой скрытой обработки, которая происходит внутри основного движка. . . внутри ZeroMQ Context() пример.

Вещи не происходят в нулевое время, извините, не (даже (почти) особые магии, такие как Большой Взрыв, действительно заняли некоторое время, чтобы создать экземпляр нашей подлинной Вселенной. . . лучше прочитать полностью детективную книгу об этом, как опубликовано Стивен ВЕЙНБЕРГ: ПЕРВЫЕ ТРИ МИНУТЫ, Современный взгляд на происхождение Вселенной - обязательно нужно полюбить.

Что ж, можно отложить это сразу после Pieter HINTJENS, " Код подключен, том 1 " на ZeroMQ Zen-of-Zero. В этом оба смысла, много смысла не только здесь).

Итак, пока inproc:// транспортный класс имеет (почти) нулевые ресурсы, являясь чистой частной абстракцией, отображаемой в памяти (в процессе) (конечно, за исключением нескольких [ns] - "устройства", такие как семафор / блокировка), инфраструктура ZeroMQ запускается и работает в "немедленном" режиме, tcp:// не имеет такого удобства, так как сначала должен сгенерировать все контракты, специфичные для транспортного класса, с O/S, с драйвером (ами) устройства, создать экземпляр ISO/OSI-{ L0, L1, L3, L3+ }, специфичный для транспортного класса -процессинг политики, создать соответствующий код накачки данных в Context() RTO-состояние, выделение и отображение буфера (-ов) памяти для этих целей, так что довольно много работы предстоит сделать, прежде чем PUB сторона переходит в состояние RTO, где она (в соответствии с более новыми версиями ~ API 4.+) также несет ответственность как за получение, так и за обработку подписки на услугу телеметрии, поскольку несет основную ответственность за SUB -обработка TOPIC-filterlist.

Вот почему это привело к зависанию .recv( ..., ZMQ_BLOCK ) похоронен внутри завернутой абстракции NetMQ subSocket.ReceiveFrameString(),

Чтобы проверить это, просто сделайте измененный тест:

  // --------------------------------------------------- // DEMO PSEUDO-CODE
     string rF = "";

     while True:

           pubSocket.SendFrame( "Hello world!", false ); // keep sending ...
                                                         // also may count++
                                                         // so as to "show" how
                                                         // many loops it took
           rF = subSocket.ReceiveFrameString(   false ); // non-blocking mode ~
                                                         // .recv(  ZMQ_NOBLOCK )
                                                         // or may use Poll()
                                                         // to just sniff for
                                                         // a message presence
           if  rF == "":
               continue; // ---^ LOOP NEXT, AS DID .recv() NOTHING YET
           break; // ----------v BREAK      AS DID .recv() MESSAGE
// ----------------------------------------------------------------------
   Console.WriteLine( rF );

Здесь можно приложить больше усилий и поэкспериментировать, чтобы увидеть ключевые роли обоих .connect() связанные с этим накладные расходы, а также необходимость не пропустить SUBсо стороны сигнализировала телеметрия о настройке подписки (подписей) + необходимость ее получения + повторная обработка на PUB с другой стороны, прежде чем какое-либо сообщение будет отправлено в первую очередь к предполагаемому, иначе просто " навсегда ", SUB


"Толстый"- достаточно .sleep( someGuestimateTIME ) предложенный уже Hesam Faridmehr, после SUB сторона имеет оба .connect() плюс это .setsockopt( ZMQ_SUBSCRIBE ) (который должен быть сначала доставлен, а также должным образом обработан на PUB со стороны (для правильной настройки процессора списка процессов TOPIC-фильтра), чего вполне достаточно перед первым PUB.send() замаскирует основную причину, сделав ее "косвенно" заблокированной, и выполнение кода останавливается, вместо того чтобы сделать решение достаточно умным - используя неблокирующую форму Poll() например - для профессионального передового опыта проектирования распределенных систем, где можно действительно повиноваться сборке хакеров, любимая первая строка настоящего макроса #ASSUME NOTHING;,

Так как pub/sub картина как радио. Издатель не будет ждать, пока подписчик подключится, он будет игнорировать отправку, если подписчик отсутствует. Вы можете проверить это, просто добавив Thread.Sleep(1000); после subSocket.SubscribeToAnyTopic(); линии, и вы увидите, что получите сообщение.

В inprocвремя связывания меньше tcp вот почему вы получаете сообщение

И в inproc, издатель должен быть включен до того, как подписчик подключится

Другие вопросы по тегам