RabbitMQ обрабатывает только 50 сообщений, затем блокирует
Я использую RabbitMQ в.net, и я вижу странную проблему, когда я отбрасываю 100 сообщений в очередь. Обрабатывает примерно 50 сообщений, затем Dequeue()
метод просто виснет. Если я перезапускаю сервис, он обрабатывает остальные пункты.
РЕДАКТИРОВАТЬ: он обрабатывает ровно 50% очереди. Когда я добавляю 1000 сообщений, он обрабатывает только 500. Даже когда однопоточный
Что мне здесь не хватает?
private void InitializeAgent() {
var agentFactory = new ConnectionFactory() { HostName = "localhost" };
agentConnection = agentFactory.CreateConnection();
agentChannel = agentConnection.CreateModel();
var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(GetType().Name, false, consumer);
}
public void DequeueMessages() {
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}
public void AgentTask() {
var instance = factory.GetInstance(threadItem);
while (true)
DequeueMessages();
}
private void ProcessWorkInThread(object state) {
var ea = state as BasicDeliverEventArgs;
var message = Encoding.UTF8.GetString(ea.Body);
var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
var item = JsonConvert.DeserializeObject<TEntity>(message, settings);
Thread.Sleep(10000) //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}
1 ответ
Это должно быть проблема версии клиента.NET, используя 3.4.0
следующий код работает как ожидалось.
static readonly ConnectionFactory Factory = new ConnectionFactory { HostName = "localhost" };
static readonly IConnection Connection = Factory.CreateConnection();
static QueueingBasicConsumer consumer;
static IModel agentChannel;
static CancellationTokenSource _tokenSource;
static void Main(string[] args)
{
_tokenSource = new CancellationTokenSource();
const string queueName = "testQueue";
agentChannel = Connection.CreateModel();
agentChannel.QueueDeclare(queueName, true, false, false, null);
agentChannel.QueueBind(queueName, "testExchange", "");
consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(queueName, false, consumer);
while (!_tokenSource.Token.IsCancellationRequested)
{
DequeueMessages();
}
Console.ReadLine();
_tokenSource.Cancel();
}
static void DequeueMessages()
{
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}
static void ProcessWorkInThread(object state)
{
var ea = state as BasicDeliverEventArgs;
var message = Encoding.UTF8.GetString(ea.Body);
var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver()
{
DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public
};
var item = JsonConvert.DeserializeObject<string>(message, settings);
Console.WriteLine(item);
Thread.Sleep(10000); //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}