Как создать собственный насос сообщений?
Я делаю небольшое упражнение, где мне нужно создать что-то похожее на насос сообщений. У меня есть очередь работы, и я хочу, чтобы работа выполнялась полностью в одном потоке, в то время как любой поток может добавить работу в очередь.
Queue<WorkToDo> queue;
Потоки используют ручку ожидания, чтобы сообщить насосу, что есть над чем поработать.
WaitHandle signal;
Насос просто зацикливается до тех пор, пока есть работа, а затем ожидает повторного запуска сигнала.
while(ApplicationIsRunning){
while(queue.HasWork){
DoWork(queue.NextWorkItem)
}
signal.Reset();
signal.WaitOne();
}
Любой другой поток может добавить работу в очередь и сигнализировать о ручке ожидания...
public void AddWork(WorkToDo work){
queue.Add(work);
signal.Set();
}
Проблема в том, что, если работа добавляется достаточно быстро, может возникнуть условие, когда работа может быть оставлена в очереди, потому что между проверкой очереди на работу и сбросом WaitHandle другой поток может добавить работу в очередь.
Как бы я мог смягчить это обстоятельство, не ставя дорогостоящий мьютекс вокруг WaitHandle?
3 ответа
Ты можешь использовать BlockingCollection<T>
сделать реализацию очереди намного проще, так как она будет обрабатывать синхронизацию для вас:
public class MessagePump
{
private BlockingCollection<Action> actions = new BlockingCollection<Action>();
public void Run() //you may want to restrict this so that only one caller from one thread is running messages
{
foreach (var action in actions.GetConsumingEnumerable())
action();
}
public void AddWork(Action action)
{
actions.Add(action);
}
public void Stop()
{
actions.CompleteAdding();
}
}
Вам не нужно делать полный мьютекс, но вы можете поставить оператор блокировки
public void AddWork(WorkToDo work)
{
queue.Add(work);
lock(lockingObject)
{
signal.Set();
}
}
используйте все, что вы хотите для объекта блокировки, большинство людей скажут, что использование самого сигнала - плохая идея.
Отвечая на комментарий @500 - Внутренняя ошибка сервера ниже, вы можете просто сбросить сигнал перед выполнением работы. Следующее должно защищать вещи:
while(ApplicationIsRunning)
{
while(queue.HasWork)
{
WorkItem wi;
lock(lockingObject)
{
wi = queue.NextWorkItem;
if(!queue.HasWork)
{
signal.Reset();
}
}
DoWork(wi)
}
signal.WaitOne();
}
Таким образом, если у вас есть больше работы, внутренняя очередь продолжает работать. Если нет, то выпадает signal.WaitOne()
, и мы только сбрасываем, если в очереди больше нет работы.
Единственным недостатком здесь является то, что возможно, что мы будем сбрасывать несколько раз подряд, если работа приходит, пока DoWork
выполняется.
Вы можете использовать метод WaitOne(TimeSpan), чтобы у вас был гибридный сигнал / цикл опроса. В основном укажите время ожидания не более 1 секунды. Это может привести к тому, что задача, попавшая в эту гонку, будет удерживаться не более секунды (или в любое другое время, указанное вами), или до тех пор, пока в вашу очередь не будет добавлена другая задача.