Реализация concurrent_vector в соответствии с блогом Intel
Я пытаюсь реализовать потокобезопасный контейнер без блокировки, аналогичный std::vector, согласно этому https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organization
Из того, что я понял, чтобы предотвратить перераспределение и аннулирование всех итераторов во всех потоках, вместо единого непрерывного массива, они добавляют новые смежные блоки.
Каждый блок, который они добавляют, имеет размер увеличивающихся степеней 2, поэтому они могут использовать log(index), чтобы найти правильный сегмент, где должен быть элемент в [index].
Из того, что я понял, у них есть статический массив указателей на сегменты, поэтому они могут быстро получить к ним доступ, однако они не знают, сколько сегментов хочет пользователь, поэтому они сделали небольшой начальный и, если количество сегментов превышает текущий счет, они выделяют огромный и переключаются на использование этого.
Проблема в том, что добавление нового сегмента не может быть выполнено безопасным способом без блокировки или, по крайней мере, я не понял, как это сделать. Я могу атомарно увеличить текущий размер, но только это.
А также переключение с малого на большой массив указателей сегментов требует большого выделения памяти и копий памяти, поэтому я не могу понять, как они это делают.
У них есть некоторый код, размещенный в сети, но все важные функции не имеют доступного исходного кода, они находятся в их библиотеке DLL Building Thread Blocks. Вот некоторый код, который демонстрирует проблему:
template<typename T>
class concurrent_vector
{
private:
int size = 0;
int lastSegmentIndex = 0;
union
{
T* segmentsSmall[3];
T** segmentsLarge;
};
void switch_to_large()
{
//Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it
}
public:
concurrent_vector()
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8)
{
switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies.
}
InterlockedIncrement(&size); //Ok, so size is atomically increased
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};
1 ответ
Я бы не стал делать segmentsLarge
а также segmentsSmall
союз. Да, это впустую еще один указатель. Затем указатель, давайте назовем его просто segments
может изначально указывать на сегменты Small.
С другой стороны, другие методы всегда могут использовать один и тот же указатель, что делает их проще.
А переключение с малого на большое может быть выполнено одним сравнительным обменом указателя.
Я не уверен, как это можно безопасно сделать с помощью профсоюза.
Идея выглядела бы примерно так (обратите внимание, что я использовал C++11, который предшествовал библиотеке Intel, поэтому они, вероятно, сделали это со своими атомарными внутренностями). Это, вероятно, пропускает довольно много деталей, о которых, я уверен, люди Intel больше думали, так что вам, вероятно, придется сравнить это с реализациями всех других методов.
#include <atomic>
#include <array>
#include <cstddef>
#include <climits>
template<typename T>
class concurrent_vector
{
private:
std::atomic<size_t> size;
std::atomic<T**> segments;
std::array<T*, 3> segmentsSmall;
unsigned lastSegmentIndex = 0;
void switch_to_large()
{
T** segmentsOld = segments;
if( segmentsOld == segmentsSmall.data()) {
// not yet switched
T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT];
// note that we leave the original segment allocations alone and just copy the pointers
std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge);
for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) {
segmentsLarge = nullptr;
}
// now both the old and the new segments array are valid
if( segments.compare_exchange_strong(segmentsOld, segmentsLarge)) {
// success!
return;
} else {
// already switched, just clean up
delete[] segmentsLarge;
}
}
}
public:
concurrent_vector() : size(0), segments(segmentsSmall.data())
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8) {
switch_to_large();
}
// here we may have to allocate more segments atomically
++size;
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};