Службы Windows, обменивающиеся данными через MSMQ. Нужна ли служебная шина?
У меня есть эта проблема, когда система содержит узлы (службы Windows), которые отправляют сообщения для обработки и другие, которые извлекают сообщения и обрабатывают их.
Это было разработано таким образом, что push-узлы балансируют нагрузку между очередями, поддерживая циклический список очередей и чередующихся очередей после каждой отправки. Поэтому сообщение 1 будет отправлено в очередь 1, сообщение 2 - в очередь 2 и т. Д. До сих пор эта часть работала отлично.
Что касается извлечения сообщения, то мы разработали его так, чтобы сообщения извлекались аналогичным образом - сначала из очереди 1, затем из очереди 2 и т. Д. Теоретически каждый узел извлечения находится на отдельной машине и на практике пока это только слушал в одной очереди. Но недавно появившееся требование заставило нас создать на машине узел, который прослушивает более одной очереди: один, который обычно очень занят и заполнен миллионами сообщений, а другой, как правило, содержит всего несколько сообщений.
Проблема, с которой мы сталкиваемся, заключается в том, что способ, которым мы изначально проектировали узлы извлечения, переходит из очереди в очередь, пока не будет найдено сообщение. Если время ожидания истекло (скажем, через секунду), то оно переходит к следующей очереди.
Это больше не работает, потому что Q1 (заполненный миллионами сообщений) будет задерживаться примерно на секунду для каждого сообщения, так как после каждого извлечения из Q1 мы будем запрашивать сообщение у Q2 (а если оно не будет содержать, мы будем ждать секунду).
Итак, это выглядит так:
Q1 содержит 10 сообщений, а Q2 не содержит ни одного
- Pull узел запрашивает сообщение от Q1
- Q1 немедленно возвращает сообщение
- Pull узел запрашивает сообщение от Q1
- ------------ Ожидание секунды ------------- (Q2 пуст и время запроса истекло)
- Pull узел запрашивает сообщение от Q1
- Q1 немедленно возвращает сообщение
- Pull узел запрашивает сообщение от Q1
- ------------ Ожидание секунды ------------- (Q2 пуст и время запроса истекло)
и т.п.
Так что это явно неправильно.
Я думаю, что я ищу лучшее архитектурное решение здесь. Обработка сообщений не обязательно должна осуществляться в режиме реального времени, насколько это возможно, но должна быть надежной, и ни одно сообщение не должно быть потеряно!
Я хотел бы услышать ваши взгляды по этой проблеме.
Заранее спасибо Яннис
2 ответа
Я закончил тем, что создал набор потоков - один для каждого msmq, который должен быть обработан. В конструкторе я инициализирую эти потоки:
Storages.ForEach(queue =>
{
Task task = Task.Factory.StartNew(() =>
{
LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
while (true)
{
WorkItem mime = queue.WaitAndRetrieve();
if (mime != null)
{
_Semaphore.WaitOne();
_LocalStorage.Enqueue(mime);
lock (_locker) Monitor.Pulse(_locker);
LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
}
}
});
});
_LocalStorage является поточно-ориентированной реализацией очереди (ConcurrentQueue, представленная в.NET 4.0)
Семафор является счетным семафором для управления вставками в _LocalStorage. _LocalStorage - это в основном буфер принимаемых сообщений, но мы не хотим, чтобы он становился слишком большим, пока узлы обработки заняты работой. В результате мы можем получить ВСЕ сообщения msmq в этом _LocalStorage, но заняты обработкой только 5 из них или около того. Это плохо как с точки зрения устойчивости (если программа неожиданно завершает работу, мы теряем все эти сообщения), так и с точки зрения производительности, поскольку потребление памяти для хранения всех этих элементов в памяти будет огромным. Поэтому нам нужно контролировать, сколько элементов мы храним в очереди буфера _LocalStorage.
- Пульсируем потоки, ожидающие работы (см. Ниже), чтобы новый элемент был добавлен в очередь с помощью простого Monitor.Pulse
Код, который удаляет рабочие элементы из очереди, выглядит следующим образом:
lock (_locker)
if (_LocalStorage.Count == 0)
Monitor.Wait(_locker);
WorkItem result;
if (_LocalStorage.TryDequeue(out result))
{
_Semaphore.Release();
return result;
}
return null;
Я надеюсь, что это может помочь кому-то разобраться в подобной проблеме.
Может быть, вы могли бы использовать событие ReceiveCompleted в классе MessageQueue? Нет необходимости опрашивать тогда.