Потребители AMQP RabbitMQ блокируют друг друга?

Я кодировал рабочее приложение C (rabbitmq-c), которое использует очередь, опубликованную скриптом Python (pika).

У меня есть следующее странное поведение, которое я не могу решить:

  1. Запуск всех рабочих до публикации сообщений в очереди работает должным образом
  2. Старт 1 работника после публикации очереди работает как положено
  3. ОДНАКО: запуск дополнительных работников после того, как работник начал потреблять из очереди, означает, что эти работники не видят никаких сообщений в очереди (количество сообщений =0) и, следовательно, просто ждут (хотя предполагается, что в очереди будет еще много сообщений)). Убийство первого работника неожиданно приведет к отправке сообщений всем остальным (ожидающим) потребителям.

Есть идеи, что может происходить?

Я пытался убедиться, что у каждого потребителя есть свой собственный канал (это необходимо?), Но все же поведение...

Вот код для потребителя (работника):

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}

2 ответа

Решение

Проблема здесь, скорее всего, в том, что ваш потребитель предварительно выбирает все сообщения при запуске. Это стандартное поведение RabbitMQ, но вы можете уменьшить количество сообщений, предварительно выбранных потребителем, чтобы лучше распределить нагрузку между несколькими работниками.

Это просто означает, что один или несколько потребителей получат все сообщения и не оставят ни одного для новых потребителей.

Если вы примените qos к своему потребителю и ограничите предварительную выборку, скажем, 10 сообщениями. Потребитель будет ставить в очередь только 10 первых сообщений, а новые потребители могут справиться с этим.

Функция, которую вы ищете для реализации этого, называется amqp_basic_qos, и, кроме того, вы можете прочитать больше о prefetch для потребителей здесь.

Это может помочь вам

Подтверждение сообщения

Выполнение задачи может занять несколько секунд. Вы можете задаться вопросом, что произойдет, если один из потребителей начнет долгое задание и умрет с ним только частично. С нашим текущим кодом, как только RabbitMQ доставит сообщение клиенту, он немедленно удалит его из памяти. В этом случае, если вы убьете работника, мы потеряем сообщение, которое он только что обработал. Мы также потеряем все сообщения, которые были отправлены этому конкретному работнику, но еще не были обработаны.

Но мы не хотим терять никаких задач. Если работник умирает, мы бы хотели, чтобы это задание было передано другому работнику.

Чтобы убедиться, что сообщение никогда не теряется, RabbitMQ поддерживает подтверждения сообщений. Подтверждение (присвоение) отправляется обратно потребителю, чтобы сообщить RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может удалить его.

Если потребитель умирает без отправки подтверждения, RabbitMQ поймет, что сообщение не было обработано полностью, и отправит его другому потребителю. Таким образом, вы можете быть уверены, что ни одно сообщение не будет потеряно, даже если рабочие иногда умирают.

Нет никаких таймаутов сообщений; RabbitMQ будет повторно доставлять сообщение, только когда рабочее соединение прекратит работу. Это хорошо, даже если обработка сообщения занимает очень и очень много времени.

Подтверждения сообщений включены по умолчанию.

Другие вопросы по тегам