Выполнение задачи от работника ZeroMQ
Довольно новый для ZeroMQ. У меня есть простая очередь REQ/REP, как показано ниже. Я использую PHP, но это не имеет значения, так как для меня подойдет любая языковая привязка. Это клиент для запроса задачи
$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;
И это рабочий, который фактически выполняет задачу.
$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
$msg = $srvr->recv();
echo "Message = " . $msg . PHP_EOL;
// Do the work here, takes 10 min, knows the count of lines added and remaining
$srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}
Поскольку задача экспорта данных занимает около 10 минут, я хочу подключиться к серверу с другого клиента и получить прогресс / процент выполнения задачи. Мне интересно, если это даже правильный подход.
Я попробовал этот подход, где REQ/REP часть работает, но я ничего не получаю в PUB/SUB части
Серверная часть
$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");
echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
$p->send($prog++ . '%'); // this part doesn't get to the progress client
$msg = $srvr->recv();
echo "Message = " . $msg . PHP_EOL;
sleep(2);// some long task
$srvr->send($msg . " Done zipping " . date('H:i:s'));
}
Прогресс клиент
$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
echo $stat->recv() . PHP_EOL; //nothing shows here
}
Запросить клиента
$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
$req->send("$i : Zip the file please");
echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}
2 ответа
Концепция осуществима, необходим некоторый тюнинг:
Все контрагенты PUB должны настроить любую подписку не по умолчанию, по крайней мере, через пустую подписку .setsockopt( ZMQ_SUBSCRIBE, "" )
то есть получать все темы (без фильтра).
Далее, обе стороны PUB и SUB должны получить .setsockopt( ZMQ_CONFLATE, 1 )
настроен, так как нет значения для заполнения и передачи всех промежуточных значений в конвейер очереди / удаления очереди, как только единственное значение находится в "последнем", самом последнем сообщении.
Всегда предпочтителен неблокирующий режим вызовов ZeroMQ (.recv( ..., flags = ZMQ_NOBLOCK )
и др.) или Poller.poll()
Предварительные тесты должны быть использованы, чтобы сначала прослушать (не) присутствие сообщения, прежде чем тратить больше усилий на чтение его контекста "из" ZeroMQ context-manager. Проще говоря, не так много случаев, когда сервисные вызовы в режиме блокировки могут хорошо работать в производственной системе.
Также некоторая дальнейшая настройка может помочь стороне PUB, в случае, если более массивная "атака" происходит из неограниченного пула объектов на стороне SUB, и PUB должен создавать / управлять / поддерживать ресурсы для каждого из этих (неограниченных) контрагентов.
Вам нужно использовать PUB/SUB только в том случае, если более одного клиента хотят получать одинаковые обновления прогресса. Просто используйте PUSH/PULL для простой, двухточечной передачи, которая работает по протоколу TCP.
Философская дискуссия
С такими проблемами, как это, есть два подхода.
- Используйте дополнительные сокеты для передачи дополнительных типов сообщений,
- Используйте только два сокета, но передавайте через них более одного типа сообщений
Вы говорите о выполнении 1). Возможно, стоит задуматься над 2), хотя я должен подчеркнуть, что я почти ничего не знаю о PHP и поэтому не знаю, существуют ли языковые функции, которые побуждают иметь отдельный клиент для запросов и выполнения.
Если вы это сделаете, вашему исходному клиенту необходим цикл (после того, как он отправил запрос) для получения нескольких сообщений, либо сообщений об обновлении прогресса, либо конечного результата. Ваш сервер, пока он выполняет 10-минутный поиск, будет регулярно отправлять сообщения об обновлении прогресса, а в конце - сообщение об окончательном результате. Вы, вероятно, будете использовать клиент PUSH/PULL для сервера, и то же самое снова для прогресса / результата от сервера обратно к клиенту.
Архитектурно более гибко следовать 2). Если у вас есть средства для отправки двух или более типов сообщений через один сокет и декодирования их на принимающей стороне, вы можете отправлять больше. Например, вы можете решить добавить сообщение "отмена" от клиента на сервер или сообщение с частичными результатами с сервера обратно клиенту. Это гораздо проще расширить, чем продолжать добавлять сокеты в вашу архитектуру просто потому, что вы хотите добавить еще один поток сообщений между клиентом и сервером. Опять же, я не знаю достаточно о PHP, чтобы сказать, что это определенно будет правильным способом сделать это на этом языке. Это, безусловно, имеет большой смысл в C, C++.
Я нахожу такие вещи, как Google Protocol Buffers (я предпочитаю ASN.1) очень полезными для такого рода вещей. Они позволяют вам определять типы сообщений, которые вы хотите отправлять, и (по крайней мере, с помощью GPB) объединять их вместе в одном "oneof" (в ASN.1 каждый использует разметку, чтобы отличать разные сообщения). GPB и ASN.1 удобны, потому что тогда вы можете использовать в своей системе разные языки, операционные системы и платформы, не беспокоясь о том, что именно отправляется. И будучи двоичными (не текстовыми), они более эффективны для сетевых подключений.