RabbitMQ асинхронная поддержка
Имеет ли клиент RabbitMQ .NET какую-либо асинхронную поддержку? Я хотел бы иметь возможность подключаться и принимать сообщения асинхронно, но пока не нашел способа сделать это.
(Для потребления сообщений я могу использовать EventingBasicConsumer, но это не полное решение.)
Просто для краткости, это пример того, как я сейчас работаю с RabbitMQ (код взят из моего блога):
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
Console.ReadLine();
}
}
3 ответа
Rabbit поддерживает диспетчеризацию асинхронных обработчиков сообщений с использованием AsyncEventingBasicConsumer
учебный класс. Работает аналогично EventingBasicConsumer
, но позволяет зарегистрировать обратный вызов, который возвращает Task
, Обратный вызов отправляется и возвращается Task
ожидается клиентом RabbitMQ.
var factory = new ConnectionFactory
{
HostName = "localhost",
DispatchConsumersAsync = true
};
using(var connection = cf.CreateConnection())
{
using(var channel = conn.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new AsyncEventingBasicConsumer(model);
consumer.Received += async (o, a) =>
{
Console.WriteLine("Message Get" + a.DeliveryTag);
await Task.Yield();
};
}
Console.ReadLine();
}
На данный момент в клиент RabbitMQ .NET не встроена поддержка async / await. Для этого есть открытый тикет в репозитории RabbitMQ .NET Client.
Подводя итог async
/ TPL
служба поддержки:
- Как упомянул @ Пол-Тернер, сейчас
AsyncEventingBasicConsumer
для которого вы можете зарегистрировать события и вернутьTask
, - Также есть
AsyncDefaultBasicConsumer
для которого вы можете переопределить виртуальные методы, такие какHandleBasicDeliver
и вернутьTask
, Оригинальный пиар здесь (похоже, он тоже был представлен в 5.0?) - Согласно последним комментариям по вышеуказанному PR и этой проблеме, похоже, что они работают над новым, чистым клиентом.NET, который будет более полно поддерживать
async
операции, но я не вижу каких-либо конкретных ссылок на эти усилия.
Есть
AsyncEventingBasicConsumer
и все, что он делает, это
await
использование ваших асинхронных "обработчиков событий" при получении сообщения. Это единственное, что здесь сделано асинхронным. Обычно вы не получаете от этого никакой прибыли, потому что у вас только один "обработчик". Сообщения по-прежнему обрабатываются одно за другим. Они обрабатываются синхронно! Также вы теряете контроль над обработкой исключений, потому что ожидание выполняется внутри Consumer.
Позвольте предположить, что под асинхронной обработкой сообщений вы имеете в виду некоторую степень параллелизма.
В итоге я использовал
ActionBlock
из TPL Dataflow.
ActionBlock
выполняет столько задач, сколько вы настроили, управляя ожиданиями и парелелизмом. Поскольку он работает с задачами, а не с потоками, он может управлять меньшими ресурсами, если они действительно асинхронны.
- Обычный
EventingBasicConsumer
звонкиactionBlock.Post(something)
. - Для параллельной обработки вам нужно указать RMQ, чтобы он отправил вам N сообщений, прежде чем вы
ack
их:model.BasicQos(0, N, true);
- ActionBlock имеет варианты с
MaxDegreeOfParallelism
свойство, которое также необходимо установить на N. - ActionBlock запускается
async Task
s, которые получают данные, ранее опубликованные Потребителем. Задачи не должны генерироваться, потому что ActionBlock останавливает всю обработку исключений. - Будьте осторожны, чтобы пройти
CancellationToken
вокруг и правильно дождитесь, пока ActionBlock завершит все запущенные задачи:actionBlock.Complete(); await actionBlock.Completion;