Надежность и устойчивость мажордомов

Для проекта я рассматриваю использование шаблона Majordomo для обмена сообщениями между процессами. Это должно заменить memcache (для получения информации о состоянии) и SCTP (для обмена сообщениями). Он доступен по адресу https://github.com/zeromq/majordomo

К сожалению, я не смог найти исчерпывающих примеров для клиента, брокера и работника. Так что я сделал их сам. Куски кода ниже.

Во время тестирования я почувствовал, что клиент завис. От работника не было ответа, даже если он работал правильно.

Поэтому мне интересно: я сделал что-то не так в своем коде?Я правильно использую библиотеку MDP?Готова ли эта библиотека к производственному использованию или это просто подтверждение концепции?

ZMQ и Majordomo должны быть устойчивы к приходу и уходу работников, перезапуску брокера и т. Д. Клиент никогда не должен сталкиваться с ситуацией, когда он бесконечно ждет ответа (он же "зависает"). Если брокер перезагружается, клиент должен вернуть ошибку.

клиент

Это мой код клиента. Он отправляет сообщение в службу и ждет ответа.

#include <stdio.h>
#include <czmq.h>
#include <majordomo.h>

int main(void)
{
    char *endpoint = "ipc:///tmp/sock";
    char *service = "MAKE COFFEE";

    int res;
    char *cmd;
    zframe_t *address;
    zmsg_t *message;

    mdp_client_t *client = mdp_client_new(endpoint);
    assert(client);

    mdp_client_set_verbose(client);

    zmsg_t *msg = zmsg_new();
    assert(msg);
    res = zmsg_addstr(msg, "Message");
    assert(res == 0);

    res = mdp_client_request(client, service, &msg);

    zsock_t *client_sock = mdp_client_msgpipe(client);
    res = zsock_recv(client_sock, "sm", &cmd, &message);

    printf("Client (2): got command %s\n", cmd);
    printf(" Response body:\n");
    zmsg_print(message);
    zmsg_destroy(&message);

    mdp_client_destroy(&client);
    return 0;
}

Маклер

Это брокер, в основном просто zactor (поток), выполняющий mdp_broker. libev используется для перехвата Ctrl-C и выхода из программы.

#include <stdio.h>
#include <ev.h>
#include <czmq.h>
#include <majordomo.h>

// intercept ctrl-c to quit software
static void sigint_cb (struct ev_loop *loop, struct ev_signal *w, int revents)
{
    ev_unloop(loop, EVUNLOOP_ALL);
}

int main(void)
{
    struct ev_loop *loop;
    ev_signal signal_watcher;

    zactor_t *broker = zactor_new(mdp_broker, "server");
    zstr_send(broker, "VERBOSE");
    zstr_sendx(broker, "BIND", "ipc:///tmp/sock", NULL);

    fprintf(stdout, "Press Ctrl-C to stop...\n");

    loop = ev_default_loop(0);

    ev_signal_init(&signal_watcher, sigint_cb, SIGINT);
    ev_signal_start(loop, &signal_watcher);

    ev_loop(loop, 0);
    ev_loop_destroy(loop);

    zactor_destroy(&broker);
    return 0;
}

работник

Этот работник ждет сообщений и отправляет ответы.

#include <stdio.h>
#include <czmq.h>
#include <majordomo.h>

int main(void)
{
    char *service = "MAKE COFFEE";
    zframe_t *address;
    zmsg_t *message;
    int res;
    char *cmd;
    char response[164];

    mdp_worker_t *worker = mdp_worker_new("ipc:///tmp/sock", service);
    assert(worker);
    // mdp_worker_set_verbose(worker);

    zsock_t *worker_sock = mdp_worker_msgpipe(worker);
    assert(worker_sock);

    while (1) {

        cmd = zstr_recv(worker_sock);
        if (!cmd) break;

        fprintf(stdout, "Got command: %s\n", cmd);

        res = zsock_recv(worker_sock, "fm", &address, &message);
        assert(res == 0);

        zmsg_print(message);
        zmsg_destroy(&message);

        zmsg_t *msg_response = zmsg_new();
        res = zmsg_addstr(msg_response, "Hello! 1");
        res = zmsg_addstr(msg_response, "Hello! 2");
        assert(res == 0);

        mdp_worker_send_final(worker, &address, &msg_response);
    }

    mdp_worker_destroy(&worker);
    return 0;
}

0 ответов

Другие вопросы по тегам