Ожидание одного сообщения RabbitMQ с таймаутом
Я хотел бы отправить сообщение на сервер RabbitMQ, а затем дождаться ответного сообщения (в очереди для ответа). Конечно, я не хочу ждать вечно, если приложение, обрабатывающее эти сообщения, не работает - должен быть тайм-аут. Это звучит как очень простая задача, но я не могу найти способ сделать это. Теперь я столкнулся с этой проблемой как с py-amqplib, так и с клиентом RabbitMQ .NET.
Лучшее решение, которое у меня есть, это опрос basic_get
с sleep
промежуточный, но это довольно уродливо
def _wait_for_message_with_timeout(channel, queue_name, timeout):
slept = 0
sleep_interval = 0.1
while slept < timeout:
reply = channel.basic_get(queue_name)
if reply is not None:
return reply
time.sleep(sleep_interval)
slept += sleep_interval
raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)
Конечно, есть какой-то лучший способ?
5 ответов
Я только что добавил поддержку тайм-аута amqplib
в carrot
,
Это подкласс amqplib.client0_8.Connection
:
http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py
wait_multi
это версия channel.wait
возможность получать по произвольному количеству каналов.
Я предполагаю, что это может быть объединено вверх по течению в некоторый момент.
Вот что я сделал в.NET-клиенте:
protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
var consumer = new QueueingBasicConsumer(Channel);
var tag = Channel.BasicConsume(queueName, true, null, consumer);
try
{
object result;
if (!consumer.Queue.Dequeue(timeoutMs, out result))
throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));
return ((BasicDeliverEventArgs)result).Body;
}
finally
{
Channel.BasicCancel(tag);
}
}
К сожалению, я не могу сделать то же самое с py-amqplib, потому что его basic_consume
метод не вызывает обратный вызов, пока вы не позвоните channel.wait()
а также channel.wait()
не поддерживает тайм-ауты! Это глупое ограничение (с которым я постоянно сталкиваюсь) означает, что если вы никогда не получите другое сообщение, ваш поток будет заморожен навсегда.
Здесь есть пример использования qpid с msg = q.get(timeout=1)
это должно делать то, что вы хотите. Извините, я не знаю, какие другие клиентские библиотеки AMQP реализуют тайм-ауты (и, в частности, я не знаю двух конкретных, которые вы упомянули).
Кажется, это разрушает всю идею асинхронной обработки, но если вам нужно, я думаю, что правильный способ сделать это - использовать RpcClient.
Кролик теперь позволяет вам добавлять события тайм-аута. Просто оберните ваш код в try catch, а затем выбросите исключения в обработчики TimeOut и Disconnect:
try{
using (IModel channel = rabbitConnection.connection.CreateModel())
{
client = new SimpleRpcClient(channel, "", "", queue);
client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
client.TimedOut += RpcTimedOutHandler;
client.Disconnected += RpcDisconnectedHandler;
byte[] replyMessageBytes = client.Call(message);
return replyMessageBytes;
}
}
catch (Exception){
//Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
throw new Exception("RPC disconnect exception occured.");
}
private void RpcTimedOutHandler(object sender, EventArgs e)
{
throw new Exception("RPC timeout exception occured.");
}