Ожидание одного сообщения RabbitMQ с таймаутом

Я хотел бы отправить сообщение на сервер RabbitMQ, а затем дождаться ответного сообщения (в очереди для ответа). Конечно, я не хочу ждать вечно, если приложение, обрабатывающее эти сообщения, не работает - должен быть тайм-аут. Это звучит как очень простая задача, но я не могу найти способ сделать это. Теперь я столкнулся с этой проблемой как с py-amqplib, так и с клиентом RabbitMQ .NET.

Лучшее решение, которое у меня есть, это опрос basic_get с sleep промежуточный, но это довольно уродливо

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

Конечно, есть какой-то лучший способ?

5 ответов

Решение

Я только что добавил поддержку тайм-аута amqplib в carrot,

Это подкласс amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py

wait_multi это версия channel.wait возможность получать по произвольному количеству каналов.

Я предполагаю, что это может быть объединено вверх по течению в некоторый момент.

Вот что я сделал в.NET-клиенте:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

К сожалению, я не могу сделать то же самое с py-amqplib, потому что его basic_consume метод не вызывает обратный вызов, пока вы не позвоните channel.wait() а также channel.wait() не поддерживает тайм-ауты! Это глупое ограничение (с которым я постоянно сталкиваюсь) означает, что если вы никогда не получите другое сообщение, ваш поток будет заморожен навсегда.

Здесь есть пример использования qpid с msg = q.get(timeout=1) это должно делать то, что вы хотите. Извините, я не знаю, какие другие клиентские библиотеки AMQP реализуют тайм-ауты (и, в частности, я не знаю двух конкретных, которые вы упомянули).

Кажется, это разрушает всю идею асинхронной обработки, но если вам нужно, я думаю, что правильный способ сделать это - использовать RpcClient.

Кролик теперь позволяет вам добавлять события тайм-аута. Просто оберните ваш код в try catch, а затем выбросите исключения в обработчики TimeOut и Disconnect:

try{
    using (IModel channel = rabbitConnection.connection.CreateModel())
    {
        client = new SimpleRpcClient(channel, "", "", queue);
        client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
        client.TimedOut += RpcTimedOutHandler;
        client.Disconnected += RpcDisconnectedHandler;
        byte[] replyMessageBytes = client.Call(message);
        return replyMessageBytes;
    }
}
catch (Exception){
    //Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
     throw new Exception("RPC disconnect exception occured.");
}

private void RpcTimedOutHandler(object sender, EventArgs e)
{
     throw new Exception("RPC timeout exception occured.");
}
Другие вопросы по тегам