Как создать собственный насос сообщений?

Я делаю небольшое упражнение, где мне нужно создать что-то похожее на насос сообщений. У меня есть очередь работы, и я хочу, чтобы работа выполнялась полностью в одном потоке, в то время как любой поток может добавить работу в очередь.

Queue<WorkToDo> queue;

Потоки используют ручку ожидания, чтобы сообщить насосу, что есть над чем поработать.

WaitHandle signal;

Насос просто зацикливается до тех пор, пока есть работа, а затем ожидает повторного запуска сигнала.

while(ApplicationIsRunning){
    while(queue.HasWork){
        DoWork(queue.NextWorkItem)
    }
    signal.Reset();
    signal.WaitOne();
}

Любой другой поток может добавить работу в очередь и сигнализировать о ручке ожидания...

public void AddWork(WorkToDo work){
    queue.Add(work);
    signal.Set();
} 

Проблема в том, что, если работа добавляется достаточно быстро, может возникнуть условие, когда работа может быть оставлена ​​в очереди, потому что между проверкой очереди на работу и сбросом WaitHandle другой поток может добавить работу в очередь.

Как бы я мог смягчить это обстоятельство, не ставя дорогостоящий мьютекс вокруг WaitHandle?

3 ответа

Решение

Ты можешь использовать BlockingCollection<T> сделать реализацию очереди намного проще, так как она будет обрабатывать синхронизацию для вас:

public class MessagePump
{
    private BlockingCollection<Action> actions = new BlockingCollection<Action>();

    public void Run() //you may want to restrict this so that only one caller from one thread is running messages
    {
        foreach (var action in actions.GetConsumingEnumerable())
            action();
    }

    public void AddWork(Action action)
    {
        actions.Add(action);
    }

    public void Stop()
    {
        actions.CompleteAdding();
    }
}

Вам не нужно делать полный мьютекс, но вы можете поставить оператор блокировки

public void AddWork(WorkToDo work)
{
  queue.Add(work);
  lock(lockingObject)
  {
    signal.Set();
  }
} 

используйте все, что вы хотите для объекта блокировки, большинство людей скажут, что использование самого сигнала - плохая идея.

Отвечая на комментарий @500 - Внутренняя ошибка сервера ниже, вы можете просто сбросить сигнал перед выполнением работы. Следующее должно защищать вещи:

while(ApplicationIsRunning)
{
  while(queue.HasWork)
  {
    WorkItem wi;
    lock(lockingObject)
    {
       wi = queue.NextWorkItem;
       if(!queue.HasWork)
       {
          signal.Reset();
       }
    }
    DoWork(wi)
  }

  signal.WaitOne();
}

Таким образом, если у вас есть больше работы, внутренняя очередь продолжает работать. Если нет, то выпадает signal.WaitOne(), и мы только сбрасываем, если в очереди больше нет работы.

Единственным недостатком здесь является то, что возможно, что мы будем сбрасывать несколько раз подряд, если работа приходит, пока DoWork выполняется.

Вы можете использовать метод WaitOne(TimeSpan), чтобы у вас был гибридный сигнал / цикл опроса. В основном укажите время ожидания не более 1 секунды. Это может привести к тому, что задача, попавшая в эту гонку, будет удерживаться не более секунды (или в любое другое время, указанное вами), или до тех пор, пока в вашу очередь не будет добавлена ​​другая задача.

Другие вопросы по тегам