Потокобезопасная реентерабельная очередь с использованием peek
Моя основная проблема заключается в необходимости немедленно обрабатывать элементы из очереди, если очередь пуста, или добавить элемент в очередь и выйти, если элемент уже обрабатывается.
Я пробую технику, которая использует peek, чтобы упростить вещи, и мне интересно, какие есть ошибки, которые могут помешать. Спасибо!
void SequenceAction(Action action) {
bool go = false;
lock (_RaiseEventQueueLock) {
_RaiseEventQueue.Enqueue(action);
go = (_RaiseEventQueue.Count == 1);
}
// 'go' can only be true if queue was empty when queue
// was locked and item was enqueued.
while (go) {
#if naive_threadsafe_presumption
// Peek is threadsafe because in effect this loop owns
// the zeroeth item in the queue for as long as the queue
// remains non-empty.
(_RaiseEventQueue.Peek())();
#else
Action a;
lock (_RaiseEventQueueLock) {
a = _RaiseEventQueue.Peek();
}
a();
#endif
// Lock the queue to see if any item was enqueued while
// the zeroeth item was being processed.
// Note that the code processing an item can call this
// function reentrantly, adding to its own todo list
// while insuring that that each item is processed
// to completion.
lock (_RaiseEventQueueLock) {
_RaiseEventQueue.Dequeue();
go = (_RaiseEventQueue.Count > 0);
}
}
}
2 ответа
// If action already in progress, add new
// action to queue and return.
// If no action in progress, begin processing
// the new action and continue processing
// actions added to the queue in the meantime.
void SequenceAction(Action action) {
lock (_SequenceActionQueueLock) {
_SequenceActionQueue.Enqueue(action);
if (_SequenceActionQueue.Count > 1) {
return;
}
}
// Would have returned if queue was not empty
// when queue was locked and item was enqueued.
for (;;) {
action();
lock (_SequenceActionQueueLock) {
_SequenceActionQueue.Dequeue();
if (_SequenceActionQueue.Count == 0) {
return;
}
action = _SequenceActionQueue.Peek();
}
}
}
На самом деле, ваш Peek
не является потокобезопасным. Добавление элемента в очередь может привести к изменению размера резервного хранилища (массива, в конце концов). Я представляю, что очередь реализована в кольцевом буфере, с головными и хвостовыми индексами для вставки и удаления.
Итак, представьте, что произойдет, если в очереди, скажем, 16 элементов. Вставить в 8, а Удалить в 9. Очередь заполнена. Тогда это происходит:
- Поток A вызывает Peek, извлекает индекс удаления (9).
- Нить A выгружается.
- Поток B вызывает Enqueue и видит, что он должен увеличить очередь.
- Поток B выделяет новый массив из 32 элементов и копирует данные из существующего массива. Данные копируются по порядку, начиная с Удалить и оборачиваясь.
- Резьба B устанавливает Удалить в 0 и Вставить в 16.
- Поток A получает следующий временной интервал и возвращает элемент в позиции 9.
- Вы только что обработали событие не по порядку, и в итоге вы снова его обработаете.
- Хуже того, вы удалите элемент в позиции 0, не обрабатывая его.
Вы можете решить эту проблему с помощью:
Action nextAction;
lock (_RaiseEventQueueLock)
{
nextAction = _RaiseEventQueue.Peek();
}
nextAction();
Я бы не стал ставить на это свою профессиональную карьеру. Я бы предложил использовать BlockingCollection и дизайн производителя / потребителя.
Возможное исправление
Мне приходит в голову, что следующее должно делать то, что вы хотели.
private readonly object _queueLock = new object();
private readonly object _processLock = new object();
void SequenceAction(Action action)
{
lock (_queueLock)
{
_RaiseEventQueue.Enqueue(action);
}
if (Monitor.TryEnter(_processLock))
{
while (true)
{
Action a;
lock (_queueLock)
{
if (_RaiseEventQueue.Count == 0) return;
a = _RaiseEventQueue.Dequeue();
}
a();
}
Monitor.Exit(_processLock);
}
}