C++11: поток с мьютексом видит изменение значения атомарной переменной, несмотря на то, что это единственный код, который может его изменить
Атомарная переменная (в данном случае 128-битная структура) обновляется, к удивлению единственного потока, который сможет ее обновить. Как так?
Это минимальный пример, поэтому он не делает ничего, что имеет смысл, но: функция alloc() возвращает буфер malloc 100 раз, затем выделяет новый буфер, который он вернет 100 раз, и так далее, даже в лицо вызова с несколькими потоками.
У меня есть атомарная переменная, которая представляет собой структуру с указателем, 32-битным int и еще одним 32-битным счетчиком, предназначенным для предотвращения проблем с ABA.
У меня есть функция с двумя разделами. В первом разделе, если счетчик возврата не равен нулю, будет CAS структура для уменьшения счетчика возврата (и увеличения счетчика ABA), а затем вернет указатель. В противном случае вторая секция получает мьютекс, выделяет память для нового указателя, а небольшая структура CAS полностью с новым указателем, новым ненулевым счетчиком возврата и снова приращением счетчика ABA.
Короче говоря, каждый поток может обновлять эту структуру, когда счетчик больше нуля. Но как только он будет равен нулю, я думаю, что первый поток, который запросит мьютекс, будет единственным потоком, который может снова обновить эту структуру CAS.
За исключением того, что иногда этот CAS не работает! "Как это может потерпеть неудачу" - вот мой вопрос.
Вот работающий пример. Его можно скомпилировать сg++ lockchange.cxx -o lockchange -latomic -pthread
. Он работает наgcc version 9.2.1 20190827 (Red Hat 9.2.1-1) (GCC)
на Fedora 31.
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cassert>
#include <cstring>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
struct MyPair { /* Hungarian: pair */
char* pc; /* a buffer to be used n times */
int32_t iRemaining; /* number of times left to use pc */
uint32_t iUpdates; /* to avoid ABA problem */
};
const int iThreads{ 200 };
const int iThreadIterations{ 1000000 };
const int iSizeItem{ 128 };
mutex mux;
atomic<MyPair> pairNext;
char* alloc() {
TRY_AGAIN:
MyPair pairCur = pairNext.load();
// CASE 1: We can use the existing buffer?
while ( pairCur.iRemaining ) {
char* pcRV = pairCur.pc;
MyPair pairNew = { pairCur.pc,
pairCur.iRemaining - 1,
pairCur.iUpdates + 1 };
if ( pairNext.compare_exchange_weak( pairCur, pairNew ) )
return pcRV;
// Otherwise, pairNext was changed out from under us and pairCur
// will have been updated. Try again, as long as iRemaining
// non-zero.
}
// CASE 2: We've used pc as many times as allowed, so allocate a new pc.
// Get a mutex as we'll be changing too many fields to do atomically.
lock_guard<mutex> guard( mux );
// If multiple threads saw iRemaining = 0, they all will
// have tried for the mutex; only one will have gotten it, so
// there's a good chance that by the time we get the mutex, a
// sibling thread will have allocated a new pc and placed it at
// pairNext, so we don't need to allocate after all.
if ( pairNext.load().iRemaining ) // <=============================== it's as if this line isn't seeing the update made by the line below in real time.
goto TRY_AGAIN;
// Get a new buffer.
char* pcNew = (char*) malloc( iSizeItem );
MyPair pairNew = { pcNew, 100, pairCur.iUpdates + 1 };
if ( pairNext.compare_exchange_strong( pairCur, pairNew ) ) { //<===== the update that's not being seen above in real time
// *** other stuff with pcNew that needs mutex protection ***;
return pcNew;
} else {
// CASE 2c: after allocating a new page, we find that
// another thread has beaten us to it. I CAN'T FIGURE OUT
// HOW THAT'S POSSIBLE THOUGH. Our response should be safe
// enough: put our allocation back, and start all over again
// because who knows what else we missed. I see this error
// like 813 times out of 40 BILLION allocations in the
// hammer test, ranging from 1 to 200 threads.
printf( "unexpected: had lock but pairNext changed when iRemaining=0\n" );
// In fact the following free and goto should and seem to
// recover fine, but to be clear my question is how we can
// possibly end up here in the first place.
abort();
free( pcNew );
goto TRY_AGAIN;
}
}
void Test( int iThreadNumber ) {
for ( int i = 0; i < iThreadIterations; i++ )
alloc();
}
int main( int nArg, char* apszArg[] ) {
vector<thread> athr;
for ( int i = 0; i < iThreads; i++ )
athr.emplace_back( Test, i );
for ( auto& thr: athr )
thr.join();
}
2 ответа
Обратите внимание, что goto TRY_AGAIN;
разблокирует мьютекс, потому что вы возвращаетесь к предыдущему lock_guard<mutex>
был построен. Обычно люди ставят{}
вокруг прицела с фиксатором вверху, чтобы прояснить это (и контролировать, когда происходит разблокировка). Я не проверял правила ISO C++, чтобы узнать, требуется ли это поведение, но, по крайней мере, то, как G++ и clang++ его реализуют,goto
разблокирует. (Смешивание блокировки RAII сgoto
вроде плохой дизайн).
Также обратите внимание, что вы перезагружаете pairNext
один раз, удерживая мьютекс, но отбросьте это значение и оставьте pairCur
в качестве "ожидаемого" значения для вашей попытки CAS.
Для достижения CAS внутри критической секции, pairNext.iRemaining
либо должно быть
- по-прежнему ноль (например, этот поток выиграл гонку за блокировку). Вы предполагаете этот случай, когда CAS успешен, потому что
pairNext == pairCur
. - или снова ноль после установки другого потока или потоков
iRemaining
до 100 и уменьшил его до нуля, пока этот поток спал. С большим количеством потоков, чем ядер, это может произойти очень легко. Однако это всегда возможно даже при большом количестве ядер: прерывание может временно заблокировать поток, или его стратегия отсрочки, когда он обнаруживает блокировки мьютекса, может привести к тому, что он не будет повторять попытку, пока счетчик снова не станет нулевым.
Я добавил новый код отладки, который проясняет это:
lock_guard<mutex> guard( mux ); // existing code
if ( pairNext.load().iRemaining )
goto TRY_AGAIN;
// new debugging code
MyPair tmp = pairNext.load();
if (memcmp(&tmp, &pairCur, sizeof(tmp)) != 0)
printf("pairNext changed between retry loop and taking the mutex\n"
"cur = %p, %d, %u\n"
"next = %p, %d, %u\n",
pairCur.pc, pairCur.iRemaining, pairCur.iUpdates,
tmp.pc, tmp.iRemaining, tmp.iUpdates);
$ clang++ -g -O2 lc.cpp -o lockchange -latomic -pthread && ./lockchange
pairNext changed between retry loop and taking the mutex
cur = 0x7f594c000e30, 0, 808
next = 0x7f5940000b60, 0, 909
unexpected: had lock but pairNext changed when iRemaining=0
Aborted (core dumped)
Исправляем это:
Поскольку вы перезагружаете pairNext
удерживая мьютекс, просто используйте это значение как "ожидаемое" для CAS. К сожалению, компиляторы не оптимизируютfoo.load().member
в загрузку только этого члена: они по-прежнему загружают весь 16-байтовый объект с lock cmpxchg16b
на x86-64 или на других ISA. Так что вы все равно оплачиваете всю стоимость.
lock_guard<mutex> guard( mux );
pairCur = pairNext.load(); // may have been changed by other threads
if ( pairCur.iRemaining )
goto TRY_AGAIN;
// then same as before, use it for CAS
// no other thread can be in the critical section,
// and the code outside won't do anything while pairNext.iRemaining == 0
16-байтовая атомная загрузка в любом случае стоит столько же, сколько CAS, но путь отказа должен либо освободить malloc
буфер или вращение, пока CAS не преуспеет, прежде чем покинуть критическую секцию. Последнее могло бы действительно работать, если бы вы могли не тратить слишком много времени процессора и вызывать конфликты, например, с_mm_pause()
.
Проблема известна как "проблема ABA", которую я мог бы резюмировать как проверку переменной в многопоточном кодировании без блокировки и думать, что она не изменилась, но изменилась.
Вот, iRemaining
- это счетчик, установленный на 100, а затем повторный обратный отсчет до 0.
После того, как мьютекс заблокирован, выполняется "проверка оптимизации" (не требуется для обеспечения корректности, а просто для того, чтобы избежать накладных расходов на выделение нового буфера и сброс iRemaining
и т. д., если это сделал другой поток) наивно проверяет наличие iRemaining == 0
определить, что структура pairCur
не изменился во время получения блокировки (что действительно может потребовать длительного ожидания).
На самом деле происходит то, что пока поток A ожидает получения блокировки, редко, но с учетом миллиардов попыток довольно много раз,iRemaining
уменьшается в 100 раз. Позволяя коду работатьabort()
затем, глядя на переменные, я вижу, что pairNext
имеет значение, скажем, { pc = XXX, iRemaining = 0, iUpdates = 23700 }
но pairNew
является { pc = YYY, iRemaining = 100, iUpdates = 23600 }
. iUpdates
сейчас на 100 больше, чем мы думали! Другими словами, пока мы ждали блокировки, было сделано еще 100 обновлений, и это было точное число, которое нужно было включить.iRemaining
снова на 0. Это также означаетpc
отличается от прежнего,
В структуре уже есть "счетчик обновлений" iUpdates
это стандартное решение, позволяющее избежать проблемы ABA. Если вместо проверкиiRemaining == 0
мы проверяем iUpdates
быть таким же, как наш атомарный снимок с предварительной блокировкой, тогда эвристика оптимизации становится на 100% эффективной, и мы никогда не дойдем до неожиданного printf()
а также abort()
. (Что ж, это все еще может произойти, но теперь требуется, чтобы поток был заблокирован для точного числа операций, кратных 2^32, вместо только 100 операций, и что может происходить только один раз в год, десятилетие или столетие, если даже возможно на эту архитектуру.) Вот улучшенный код:
if ( pairNext.load().iUpdates != pairCur.iUpdates ) // <=============================== it's as if this line isn'tseeing the update made by the line below in real time.