amqp_basic_qos не имеет никакого эффекта

Я пытаюсь кодировать простого потребителя, используя librabbitmq. Это работает, но когда я выполняю amqp_basic_consume, он потребляет всю очередь. Я хочу, чтобы он получил одно сообщение, обработал его и повторил.

Я пытался использовать basic_qos для предварительной выборки потребителя 1, но это, похоже, не дает никакого эффекта.

Базовая установка и цикл: // устанавливаем qos 1 сообщения за раз if (! Amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }

// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);

while (run) {
    amqp_rpc_reply_t result;
    amqp_envelope_t envelope;

    amqp_maybe_release_buffers(conn);
    result = amqp_consume_message(conn, &envelope, &timeout, 0);

    if (AMQP_RESPONSE_NORMAL == result.reply_type) {

        strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
        message[envelope.message.body.len] = '\0';

        printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );

        if ( strncmp(message, "DONE",4 ) == 0 )
        {
            printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
            run = 0;
        }
        amqp_destroy_envelope(&envelope);
    }else{
         printf("Timeout.\n");
         run = 0;
    }
}

Я ожидаю заполнить очередь, которую я могу начать обрабатывать, и если я нажму ^C, остальные сообщения все еще находятся в очереди. Вместо этого, даже если я обработал только одно сообщение, вся очередь очищается.

1 ответ

Это поведение, когда noAck правда. Что произойдет, так это то, что сообщения будут отправлены подключенному потребителю так быстро, как только брокер сможет их отправить, поскольку предполагается, что потребитель может принять их, поскольку они были подтверждены сразу после доставки.

Вы хотели бы изменить noAck ложно, то явно ack каждое сообщение возвращается брокеру в этом случае.

В качестве альтернативы, вы можете использовать basic.get получать сообщения от брокера по одному за раз, в отличие от использования потребителя на основе push (есть люди, которым эта идея не нравится). Ваш вариант использования определит, что является наиболее подходящим, но исходя из того факта, что у вас, кажется, имеется полная очередь и сообщения, требующие большого объема обработки, я бы предположил, что basic.get было бы хорошо в этом сценарии. Тогда возникает вопрос: как часто проводить опрос, когда очередь пуста?

Другие вопросы по тегам