Блокировка Deque в Win32 C++
Я довольно новичок в структурах данных без блокировки, поэтому для упражнения я написал (что, как я надеюсь, функционирует как) ограниченную деку без блокировки (пока нет изменения размера, просто хочу, чтобы базовые случаи работали). Я просто хотел бы получить подтверждение от людей, которые знают, что они делают, о том, правильно ли я понял и / или как я мог бы это улучшить.
class LocklessDeque
{
public:
LocklessDeque() : m_empty(false),
m_bottom(0),
m_top(0) {}
~LocklessDeque()
{
// Delete remaining tasks
for( unsigned i = m_top; i < m_bottom; ++i )
delete m_tasks[i];
}
void PushBottom(ITask* task)
{
m_tasks[m_bottom] = task;
InterlockedIncrement(&m_bottom);
}
ITask* PopBottom()
{
if( m_bottom - m_top > 0 )
{
m_empty = false;
InterlockedDecrement(&m_bottom);
return m_tasks[m_bottom];
}
m_empty = true;
return NULL;
}
ITask* PopTop()
{
if( m_bottom - m_top > 0 )
{
m_empty = false;
InterlockedIncrement(&m_top);
return m_tasks[m_top];
}
m_empty = true;
return NULL;
}
bool IsEmpty() const
{
return m_empty;
}
private:
ITask* m_tasks[16];
bool m_empty;
volatile unsigned m_bottom;
volatile unsigned m_top;
};
4 ответа
Глядя на это, я думаю, что это будет проблемой:
void PushBottom(ITask* task)
{
m_tasks[m_bottom] = task;
InterlockedIncrement(&m_bottom);
}
Если это используется в реальной многопоточной среде, я думаю, что вы столкнетесь при настройке m_tasks[m_bottom]
, Подумайте, что произойдет, если у вас есть два потока, пытающихся сделать это одновременно - вы не можете быть уверены, какой из них на самом деле установил m_tasks[m_bottom].
Проверьте эту статью, которая является разумным обсуждением очереди без блокировки.
Ваше использование m_bottom
а также m_top
Члены индекса для массива не в порядке. Вы можете использовать возвращаемое значение InterlockedXxxx (), чтобы получить безопасный индекс. Вам нужно будет потерять IsEmpty (), он никогда не будет точным в многопоточном сценарии. Та же проблема с пустой проверкой в PopXxx. Я не понимаю, как вы могли бы сделать это без мьютекса.
Ключом к выполнению почти невозможных вещей, подобных этому, является использование InterlockedCompareExchange. (Это имя используется Win32, но любая многопоточная платформа будет иметь эквивалент InterlockedCompareExchange).
Идея заключается в том, что вы создаете копию структуры (которая должна быть достаточно маленькой, чтобы выполнить атомарное чтение (64 или, если вы можете справиться с некоторой непереносимостью, 128 бит на x86).
Вы делаете еще одну копию с вашим предложенным обновлением, выполняете свою логику и обновляете копию, затем вы обновляете "реальную" структуру, используя InterlockedCompareExchange. InterlockedCompareExchange выполняет атомарную проверку того, что значение по-прежнему равно значению, с которым вы начали до обновления состояния, и, если оно все еще является этим значением, автоматически обновляет значение с новым состоянием. Как правило, это заключено в бесконечный цикл, который продолжает попытки, пока кто-то еще не изменил значение в середине. Вот примерно шаблон:
union State
{
struct
{
short a;
short b;
};
uint32_t volatile rawState;
} state;
void DoSomething()
{
// Keep looping until nobody else changed it behind our back
for (;;)
{
state origState;
state newState;
// It's important that you only read the state once per try
origState.rawState = state.rawState;
// This must copy origState, NOT read the state again
newState.rawState = origState.rawState;
// Now you can do something impossible to do atomically...
// This example takes a lot of cycles, there is huge
// opportunity for another thread to come in and change
// it during this update
if (newState.b == 3 || newState.b % 6 != 0)
{
newState.a++;
}
// Now we atomically update the state,
// this ONLY changes state.rawState if it's still == origState.rawState
// In either case, InterlockedCompareExchange returns what value it has now
if (InterlockedCompareExchange(&state.rawState, newState.rawState,
origState.rawState) == origState.rawState)
return;
}
}
(Пожалуйста, прости, если приведенный выше код на самом деле не компилируется - я записал его с ног на голову)
Отлично. Теперь вы можете легко создавать алгоритмы без блокировки. НЕПРАВИЛЬНО! Проблема в том, что вы строго ограничены объемом данных, которые вы можете обновлять атомарно.
Некоторые алгоритмы без блокировки используют технику, в которой они "помогают" параллельным потокам. Например, скажем, у вас есть связанный список, который вы хотите иметь возможность обновлять из нескольких потоков, другие потоки могут "помочь", выполнив обновления указателей "первый" и "последний", если они пролистывают и видят, что они в узле, указанном "последним", но указатель "следующий" в узле не является нулевым. В этом примере, заметив, что "последний" указатель неверен, они обновляют последний указатель, только если он все еще указывает на текущий узел, используя обмен сравнениями с блокировкой.
Не попадайтесь в ловушку, где вы "вращаетесь" или зацикливаетесь (как спин-блокировка). Хотя есть смысл в кратковременном вращении, потому что вы ожидаете, что "другой" поток что-то закончит - они могут этого не делать. "Другой" поток может быть переключен по контексту и может больше не работать. Вы просто тратите время процессора, сжигая электричество (возможно, убивая батарею ноутбука), вращаясь, пока условие не станет истинным. В тот момент, когда вы начинаете вращаться, вы также можете бросить свой код без блокировки и написать его с блокировками. Замки лучше, чем неограниченное вращение.
Просто перейдите от сложного к нелепому, рассмотрите беспорядок, с которым вы можете столкнуться с другими архитектурами - в целом, x86/x64 довольно простителен, но когда вы попадаете в другие "слабо упорядоченные" архитектуры, вы попадаете на территорию, где происходят такие вещи, которые не имеет смысла - обновления памяти не будут происходить в порядке программы, поэтому все ваши мысли о том, что делает другой поток, уходят в окно. (Даже x86/x64 имеют тип памяти, называемый "объединение записи", который часто используется при обновлении видеопамяти, но может использоваться для любого оборудования буфера памяти, где вам нужны ограждения). Эти архитектуры требуют использования операций "ограничения памяти", чтобы гарантировать что все чтения / записи / оба перед ограждением будут видны глобально (другими ядрами). Ограничение записи гарантирует, что любые записи перед разделителем будут видны глобально до любых записей после разделителя. Забор чтения гарантирует, что никакие чтения после забора не будут спекулятивно выполнены перед забором. Забой на чтение / запись (он же забор или забор памяти) даст обе гарантии. Заборы очень дорогие. (Некоторые используют термин "барьер" вместо "забор")
Мое предложение состоит в том, чтобы реализовать его сначала с помощью переменных блокировки / условия. Если у вас есть какие-либо проблемы с идеальной работой, попытка сделать реализацию без блокировки бесполезна. И всегда измеряй, измеряй, измеряй. Вы, вероятно, обнаружите, что производительность реализации с использованием блокировок совершенно безупречна - без неопределенности какой-либо ненадежной реализации без блокировок с навязчивой ошибкой зависания, которая будет отображаться только при демонстрации важного клиента. Возможно, вы можете решить проблему, переосмыслив исходную проблему в нечто более простое, возможно, путем реструктуризации работы, чтобы более крупные предметы (или партии предметов) попадали в коллекцию, что снижает нагрузку на все это.
Написание параллельных алгоритмов без блокировки очень сложно (я уверен, что вы видели написанное 1000 раз в другом месте). Это часто не стоит усилий.
Решая проблему, указанную Аароном, я бы сделал что-то вроде:
void PushBottom(ITask *task) {
int pos = InterlockedIncrement(&m_bottom);
m_tasks[pos] = task;
}
Точно так же, чтобы поп:
ITask* PopTop() {
int pos = InterlockedIncrement(&m_top);
if (pos == m_bottom) // This is still subject to a race condition.
return NULL;
return m_tasks[pos];
}
Я бы исключил оба m_empty
а также IsEmpty()
от дизайна полностью. Результат, возвращаемый IsEmpty, зависит от состояния гонки, то есть к тому времени, когда вы смотрите на этот результат, он вполне может быть устаревшим (то есть то, что он говорит вам об очереди, может оказаться неправильным, когда вы смотрите на то, что он возвратил). Точно так же, m_empty
не предоставляет ничего, кроме записи информации, которая уже доступна без нее, рецепт для получения устаревших данных. С помощью m_empty
не гарантирует, что это не может работать правильно, но это значительно увеличивает шансы на ошибки, IMO.
Я предполагаю, что это связано с временным характером кода, но сейчас у вас также есть некоторые серьезные проблемы с границами массива. Вы ничего не делаете, чтобы заставить ваши индексы массива обернуться, поэтому, как только вы попытаетесь поместить 17-ю задачу в очередь, у вас возникнет серьезная проблема.
Редактировать: я должен отметить, что состояние гонки, указанное в комментарии, довольно серьезное - и это не единственное. Хотя это несколько лучше, чем оригинал, это не следует путать с кодом, который будет работать правильно.
Я бы сказал, что написание правильного кода без блокировок значительно сложнее, чем написание правильного кода, использующего блокировки. Я не знаю никого, кто сделал бы это без четкого понимания кода, который использует блокировку. Основываясь на исходном коде, я бы сказал, что было бы намного лучше начать с написания и понимания кода для очереди, в которой используются блокировки, и только тогда, когда вы используете это, чтобы получить гораздо лучшее понимание проблем, которые действительно вызывают попытка безблокировочного кода.