Надежность и устойчивость мажордомов
Для проекта я рассматриваю использование шаблона Majordomo для обмена сообщениями между процессами. Это должно заменить memcache (для получения информации о состоянии) и SCTP (для обмена сообщениями). Он доступен по адресу https://github.com/zeromq/majordomo
К сожалению, я не смог найти исчерпывающих примеров для клиента, брокера и работника. Так что я сделал их сам. Куски кода ниже.
Во время тестирования я почувствовал, что клиент завис. От работника не было ответа, даже если он работал правильно.
Поэтому мне интересно: я сделал что-то не так в своем коде?Я правильно использую библиотеку MDP?Готова ли эта библиотека к производственному использованию или это просто подтверждение концепции?
ZMQ и Majordomo должны быть устойчивы к приходу и уходу работников, перезапуску брокера и т. Д. Клиент никогда не должен сталкиваться с ситуацией, когда он бесконечно ждет ответа (он же "зависает"). Если брокер перезагружается, клиент должен вернуть ошибку.
клиент
Это мой код клиента. Он отправляет сообщение в службу и ждет ответа.
#include <stdio.h>
#include <czmq.h>
#include <majordomo.h>
int main(void)
{
char *endpoint = "ipc:///tmp/sock";
char *service = "MAKE COFFEE";
int res;
char *cmd;
zframe_t *address;
zmsg_t *message;
mdp_client_t *client = mdp_client_new(endpoint);
assert(client);
mdp_client_set_verbose(client);
zmsg_t *msg = zmsg_new();
assert(msg);
res = zmsg_addstr(msg, "Message");
assert(res == 0);
res = mdp_client_request(client, service, &msg);
zsock_t *client_sock = mdp_client_msgpipe(client);
res = zsock_recv(client_sock, "sm", &cmd, &message);
printf("Client (2): got command %s\n", cmd);
printf(" Response body:\n");
zmsg_print(message);
zmsg_destroy(&message);
mdp_client_destroy(&client);
return 0;
}
Маклер
Это брокер, в основном просто zactor (поток), выполняющий mdp_broker. libev используется для перехвата Ctrl-C и выхода из программы.
#include <stdio.h>
#include <ev.h>
#include <czmq.h>
#include <majordomo.h>
// intercept ctrl-c to quit software
static void sigint_cb (struct ev_loop *loop, struct ev_signal *w, int revents)
{
ev_unloop(loop, EVUNLOOP_ALL);
}
int main(void)
{
struct ev_loop *loop;
ev_signal signal_watcher;
zactor_t *broker = zactor_new(mdp_broker, "server");
zstr_send(broker, "VERBOSE");
zstr_sendx(broker, "BIND", "ipc:///tmp/sock", NULL);
fprintf(stdout, "Press Ctrl-C to stop...\n");
loop = ev_default_loop(0);
ev_signal_init(&signal_watcher, sigint_cb, SIGINT);
ev_signal_start(loop, &signal_watcher);
ev_loop(loop, 0);
ev_loop_destroy(loop);
zactor_destroy(&broker);
return 0;
}
работник
Этот работник ждет сообщений и отправляет ответы.
#include <stdio.h>
#include <czmq.h>
#include <majordomo.h>
int main(void)
{
char *service = "MAKE COFFEE";
zframe_t *address;
zmsg_t *message;
int res;
char *cmd;
char response[164];
mdp_worker_t *worker = mdp_worker_new("ipc:///tmp/sock", service);
assert(worker);
// mdp_worker_set_verbose(worker);
zsock_t *worker_sock = mdp_worker_msgpipe(worker);
assert(worker_sock);
while (1) {
cmd = zstr_recv(worker_sock);
if (!cmd) break;
fprintf(stdout, "Got command: %s\n", cmd);
res = zsock_recv(worker_sock, "fm", &address, &message);
assert(res == 0);
zmsg_print(message);
zmsg_destroy(&message);
zmsg_t *msg_response = zmsg_new();
res = zmsg_addstr(msg_response, "Hello! 1");
res = zmsg_addstr(msg_response, "Hello! 2");
assert(res == 0);
mdp_worker_send_final(worker, &address, &msg_response);
}
mdp_worker_destroy(&worker);
return 0;
}