RabbitMQ асинхронная поддержка

Имеет ли клиент RabbitMQ .NET какую-либо асинхронную поддержку? Я хотел бы иметь возможность подключаться и принимать сообщения асинхронно, но пока не нашел способа сделать это.

(Для потребления сообщений я могу использовать EventingBasicConsumer, но это не полное решение.)

Просто для краткости, это пример того, как я сейчас работаю с RabbitMQ (код взят из моего блога):

var factory = new ConnectionFactory() { HostName = "localhost" };

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
        channel.BasicConsume("testqueue", true, consumer);

        Console.ReadLine();
    }
}

3 ответа

Решение

Rabbit поддерживает диспетчеризацию асинхронных обработчиков сообщений с использованием AsyncEventingBasicConsumer учебный класс. Работает аналогично EventingBasicConsumer, но позволяет зарегистрировать обратный вызов, который возвращает Task, Обратный вызов отправляется и возвращается Task ожидается клиентом RabbitMQ.

var factory = new ConnectionFactory
{
    HostName = "localhost",
    DispatchConsumersAsync = true
};

using(var connection = cf.CreateConnection())
{
    using(var channel = conn.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new AsyncEventingBasicConsumer(model);

        consumer.Received += async (o, a) =>
        {
            Console.WriteLine("Message Get" + a.DeliveryTag);
            await Task.Yield();
        };
    }

    Console.ReadLine();
}

На данный момент в клиент RabbitMQ .NET не встроена поддержка async / await. Для этого есть открытый тикет в репозитории RabbitMQ .NET Client.

Подводя итог async / TPL служба поддержки:

  • Как упомянул @ Пол-Тернер, сейчас AsyncEventingBasicConsumer для которого вы можете зарегистрировать события и вернуть Task,
  • Также есть AsyncDefaultBasicConsumer для которого вы можете переопределить виртуальные методы, такие как HandleBasicDeliver и вернуть Task, Оригинальный пиар здесь (похоже, он тоже был представлен в 5.0?)
  • Согласно последним комментариям по вышеуказанному PR и этой проблеме, похоже, что они работают над новым, чистым клиентом.NET, который будет более полно поддерживать async операции, но я не вижу каких-либо конкретных ссылок на эти усилия.

Есть AsyncEventingBasicConsumer и все, что он делает, это awaitиспользование ваших асинхронных "обработчиков событий" при получении сообщения. Это единственное, что здесь сделано асинхронным. Обычно вы не получаете от этого никакой прибыли, потому что у вас только один "обработчик". Сообщения по-прежнему обрабатываются одно за другим. Они обрабатываются синхронно! Также вы теряете контроль над обработкой исключений, потому что ожидание выполняется внутри Consumer.

Позвольте предположить, что под асинхронной обработкой сообщений вы имеете в виду некоторую степень параллелизма.

В итоге я использовал ActionBlock из TPL Dataflow. ActionBlockвыполняет столько задач, сколько вы настроили, управляя ожиданиями и парелелизмом. Поскольку он работает с задачами, а не с потоками, он может управлять меньшими ресурсами, если они действительно асинхронны.

  1. Обычный EventingBasicConsumer звонки actionBlock.Post(something).
  2. Для параллельной обработки вам нужно указать RMQ, чтобы он отправил вам N сообщений, прежде чем вы ack их: model.BasicQos(0, N, true);
  3. ActionBlock имеет варианты с MaxDegreeOfParallelism свойство, которое также необходимо установить на N.
  4. ActionBlock запускается async Tasks, которые получают данные, ранее опубликованные Потребителем. Задачи не должны генерироваться, потому что ActionBlock останавливает всю обработку исключений.
  5. Будьте осторожны, чтобы пройти CancellationToken вокруг и правильно дождитесь, пока ActionBlock завершит все запущенные задачи: actionBlock.Complete(); await actionBlock.Completion;
Другие вопросы по тегам