Потокобезопасная реентерабельная очередь с использованием peek

Моя основная проблема заключается в необходимости немедленно обрабатывать элементы из очереди, если очередь пуста, или добавить элемент в очередь и выйти, если элемент уже обрабатывается.

Я пробую технику, которая использует peek, чтобы упростить вещи, и мне интересно, какие есть ошибки, которые могут помешать. Спасибо!

    void SequenceAction(Action action) {
       bool go = false;

       lock (_RaiseEventQueueLock) {
          _RaiseEventQueue.Enqueue(action);
          go = (_RaiseEventQueue.Count == 1); 
       }

       // 'go' can only be true if queue was empty when queue 
       //  was locked and item was enqueued.
       while (go) {
          #if naive_threadsafe_presumption 
          // Peek is threadsafe because in effect this loop owns
          //  the zeroeth item in the queue for as long as the queue 
          //  remains non-empty.
          (_RaiseEventQueue.Peek())();

          #else
          Action a;
          lock (_RaiseEventQueueLock) {
             a = _RaiseEventQueue.Peek();
          }
          a();
          #endif   

          // Lock the queue to see if any item was enqueued while
          //  the zeroeth item was being processed.
          // Note that the code processing an item can call this 
          //  function reentrantly, adding to its own todo list
          //  while insuring that that each item is processed 
          //  to completion.
          lock (_RaiseEventQueueLock) {
             _RaiseEventQueue.Dequeue();
             go = (_RaiseEventQueue.Count > 0); 
          }
       }
    }

2 ответа

Решение
    // If action already in progress, add new
    //  action to queue and return.
    // If no action in progress, begin processing
    //  the new action and continue processing
    //  actions added to the queue in the meantime.
    void SequenceAction(Action action) {
       lock (_SequenceActionQueueLock) {
          _SequenceActionQueue.Enqueue(action);
          if (_SequenceActionQueue.Count > 1) {
             return;
          }
       }
       // Would have returned if queue was not empty
       //  when queue was locked and item was enqueued.
       for (;;) {
          action();
          lock (_SequenceActionQueueLock) {
             _SequenceActionQueue.Dequeue();
             if (_SequenceActionQueue.Count == 0) {
                return;
             }
             action = _SequenceActionQueue.Peek();
          }
       }
    }

На самом деле, ваш Peek не является потокобезопасным. Добавление элемента в очередь может привести к изменению размера резервного хранилища (массива, в конце концов). Я представляю, что очередь реализована в кольцевом буфере, с головными и хвостовыми индексами для вставки и удаления.

Итак, представьте, что произойдет, если в очереди, скажем, 16 элементов. Вставить в 8, а Удалить в 9. Очередь заполнена. Тогда это происходит:

  1. Поток A вызывает Peek, извлекает индекс удаления (9).
  2. Нить A выгружается.
  3. Поток B вызывает Enqueue и видит, что он должен увеличить очередь.
  4. Поток B выделяет новый массив из 32 элементов и копирует данные из существующего массива. Данные копируются по порядку, начиная с Удалить и оборачиваясь.
  5. Резьба B устанавливает Удалить в 0 и Вставить в 16.
  6. Поток A получает следующий временной интервал и возвращает элемент в позиции 9.
  7. Вы только что обработали событие не по порядку, и в итоге вы снова его обработаете.
  8. Хуже того, вы удалите элемент в позиции 0, не обрабатывая его.

Вы можете решить эту проблему с помощью:

Action nextAction;
lock (_RaiseEventQueueLock)
{
    nextAction = _RaiseEventQueue.Peek();
}
nextAction();

Я бы не стал ставить на это свою профессиональную карьеру. Я бы предложил использовать BlockingCollection и дизайн производителя / потребителя.

Возможное исправление

Мне приходит в голову, что следующее должно делать то, что вы хотели.

private readonly object _queueLock = new object();
private readonly object _processLock = new object();

void SequenceAction(Action action)
{
    lock (_queueLock)
    {
        _RaiseEventQueue.Enqueue(action);
    }
    if (Monitor.TryEnter(_processLock))
    {
        while (true)
        {
            Action a;
            lock (_queueLock)
            {
                if (_RaiseEventQueue.Count == 0) return;
                a = _RaiseEventQueue.Dequeue();
            }
            a();
        }
        Monitor.Exit(_processLock);
    }
}
Другие вопросы по тегам