RabbitMQ обрабатывает только 50 сообщений, затем блокирует

Я использую RabbitMQ в.net, и я вижу странную проблему, когда я отбрасываю 100 сообщений в очередь. Обрабатывает примерно 50 сообщений, затем Dequeue() метод просто виснет. Если я перезапускаю сервис, он обрабатывает остальные пункты.

РЕДАКТИРОВАТЬ: он обрабатывает ровно 50% очереди. Когда я добавляю 1000 сообщений, он обрабатывает только 500. Даже когда однопоточный

Что мне здесь не хватает?

    private void InitializeAgent() {
        var agentFactory = new ConnectionFactory() { HostName = "localhost" };
        agentConnection = agentFactory.CreateConnection();
        agentChannel = agentConnection.CreateModel();
        var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
        consumer = new QueueingBasicConsumer(agentChannel);
        agentChannel.BasicConsume(GetType().Name, false, consumer);
    }

    public void DequeueMessages() {
        ThreadPool.SetMaxThreads(200, 200);
        ThreadPool.SetMinThreads(200, 200);
        var ea = consumer.Queue.Dequeue();
        ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
    }

    public void AgentTask() {
        var instance = factory.GetInstance(threadItem);

        while (true) 
            DequeueMessages();
    }

    private void ProcessWorkInThread(object state) {
         var ea = state as BasicDeliverEventArgs;

         var message = Encoding.UTF8.GetString(ea.Body);

         var settings = new JsonSerializerSettings();
         settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
         var item = JsonConvert.DeserializeObject<TEntity>(message, settings);

         Thread.Sleep(10000) //simulate work
         lock (agentChannel)             
             agentChannel.BasicAck(ea.DeliveryTag, false);            
     }

1 ответ

Это должно быть проблема версии клиента.NET, используя 3.4.0следующий код работает как ожидалось.

static readonly ConnectionFactory Factory = new ConnectionFactory { HostName = "localhost" };
static readonly IConnection Connection = Factory.CreateConnection();
static QueueingBasicConsumer consumer;
static IModel agentChannel;

static CancellationTokenSource _tokenSource;

static void Main(string[] args)
{
    _tokenSource = new CancellationTokenSource();

    const string queueName = "testQueue";
    agentChannel = Connection.CreateModel();
    agentChannel.QueueDeclare(queueName, true, false, false, null);
    agentChannel.QueueBind(queueName, "testExchange", "");

    consumer = new QueueingBasicConsumer(agentChannel);
    agentChannel.BasicConsume(queueName, false, consumer);

    while (!_tokenSource.Token.IsCancellationRequested)
    {
        DequeueMessages();
    }
    Console.ReadLine();
    _tokenSource.Cancel();
}

static void DequeueMessages()
{
    ThreadPool.SetMaxThreads(200, 200);
    ThreadPool.SetMinThreads(200, 200);
    var ea = consumer.Queue.Dequeue();
    ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}

static void ProcessWorkInThread(object state)
{
    var ea = state as BasicDeliverEventArgs;

    var message = Encoding.UTF8.GetString(ea.Body);

    var settings = new JsonSerializerSettings();
    settings.ContractResolver = new DefaultContractResolver()
    {
        DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public
    };
    var item = JsonConvert.DeserializeObject<string>(message, settings);
    Console.WriteLine(item);
    Thread.Sleep(10000); //simulate work
    lock (agentChannel)
        agentChannel.BasicAck(ea.DeliveryTag, false);
}
Другие вопросы по тегам