Есть ли готовая к производству очередь без блокировки или хэш-реализация в C++?
Я довольно долго гуглил очередь без блокировки в C++. Я нашел некоторый код и несколько испытаний - но ничего, что я смог собрать. Хэш без блокировки также приветствуется.
РЕЗЮМЕ: Пока у меня нет положительного ответа. Здесь нет библиотеки, готовой к работе, и, что удивительно, ни одна из существующих библиотек не соответствует API контейнеров STL.
15 ответов
Начиная с 1.53, boost предоставляет набор структур данных без блокировок, включая очереди, стеки и очереди одного производителя / одного потребителя (т.е. кольцевые буферы).
Отправной точкой могут стать статьи DDJ Херба Саттера для одного производителя и потребителя или для нескольких. Код, который он дает (встроенный, начиная со второй страницы каждой статьи), использует атомарный тип шаблона
Код повышения скрыт в недрах межпроцессной библиотеки, но, прочитав соответствующий заголовочный файл (atomic.hpp), реализует необходимые операции сравнения и замены в системах, с которыми я знаком, с помощью звука взгляда.
Да!
Я написал очередь без блокировки. Особенности:
- Полностью без ожидания (без петель CAS)
- Сверхбыстрый (более ста миллионов операций ввода-вывода в секунду)
- Использует семантику перемещения C++11
- Растет по мере необходимости (но только если вы этого хотите)
- Осуществляет управление памятью без блокировок для элементов (используя заранее выделенные смежные блоки)
- Автономный (два заголовка плюс лицензия и readme)
- Компилируется под MSVC2010+, Intel ICC 13 и GCC 4.7.2 (и должно работать с любым полностью совместимым компилятором C++11)
Он доступен на GitHub под упрощенной лицензией BSD (не стесняйтесь его раскошелиться!).
Предостережения:
- Только для однопользовательской архитектуры одного производителя (т. Е. Двух потоков)
- Тщательно протестировано на x86(-64) и должно работать на ARM, PowerPC и других процессорах, где выровненные целочисленные целые числа и загрузки и хранилища указателей естественно атомарные, но не тестировались в полевых условиях на процессорах не-x86 (если кто-то один, чтобы проверить это, дайте мне знать)
- Не знаю, будут ли нарушены какие-либо патенты (используйте на свой страх и риск и т. Д.). Обратите внимание, что я разработал и реализовал это сам с нуля.
Похоже, что у Facebook в Facebook есть блокировка свободных структур данных на основе C++ 11 <atomic>
:
ProducerConsumerQueue с документами и примером кода здесь.
AtomicHashMap с документами и примером кода здесь
Я бы осмелился сказать, что они в настоящее время используются в производстве, поэтому я думаю, что они могли бы безопасно использоваться в других проектах.
Ура!
boost.lockfree - это попытка создания реализаций C++ для стека lockfree и классов fifo.
После проверки большинства приведенных ответов я могу только заявить:
Ответ НЕТ.
Нет такого права, что можно было бы использовать прямо из коробки.
Самая близкая вещь, о которой я знаю, - это односторонне связанные списки, связанные с Windows. Конечно, это только Windows.
Если у вас есть очередь /FIFO с несколькими производителями / одним потребителем, вы можете легко создать один LockFree с помощью SLIST или тривиального стека LIFO без блокировки. То, что вы делаете, - это второй "приватный" стек для потребителя (который также можно сделать как SLIST для простоты или любой другой модели стека, которую вы выберете). Потребитель выталкивает предметы из частного стека. Всякий раз, когда используется приватный LIFO, вы выполняете Flush, а не вытаскиваете из общего параллельного SLIST (захватывает всю цепочку SLIST), а затем идете по списку Flushed по порядку, помещая элементы в приватный стек.
Это работает для одного производителя / одного потребителя и для нескольких производителей / одного потребителя.
Однако это не работает для случаев с несколькими потребителями (с одним или несколькими производителями).
Кроме того, что касается хеш-таблиц, они являются идеальным кандидатом для "чередования", которое просто делит хеш на сегменты, имеющие блокировку на сегменты кэша. Вот как это делает параллельная библиотека Java (используя 32 полосы). Если у вас есть легкая блокировка чтения-записи, к хеш-таблице можно одновременно обращаться для одновременного чтения, и вы будете останавливаться только тогда, когда запись происходит по оспариваемым полосам (и, возможно, если вы разрешите увеличение хеш-таблицы).
Если вы свернули свои собственные, убедитесь, что чередуете свои блокировки с хеш-записями, а не размещайте все свои блокировки в массиве рядом друг с другом, чтобы у вас была меньше шансов на ложное совместное использование.
Я могу немного опоздать на это.
Отсутствие решений (на вопрос был задан) в основном связано с важной проблемой в C++ (до C++0x/11): C++ не имеет (не имеет) параллельной модели памяти.
Теперь, используя std::atomic, вы можете контролировать проблемы с упорядочением памяти и выполнять правильные операции сравнения и замены. Я написал себе реализацию очереди без блокировок Micheal&Scott (PODC96), использующей C++11 и указатели опасности Micheal (IEEE TPDS 2004), чтобы избежать проблем с ранней свободой и ABA. Он работает нормально, но это быстрая и грязная реализация, и я не удовлетворен фактическими выступлениями. Код доступен на bitbucket: LockFreeExperiment
Также возможно реализовать очередь без блокировок без указателей опасности с использованием двойных слов CAS (но 64-битные версии будут возможны только на x86-64 с использованием cmpxchg16b), об этом я пишу в блоге (с непроверенным кодом для очереди) здесь: Реализация общего сравнения двух слов и замены для x86 / x86-64 (блог LSE.)
Мой собственный тест показывает, что очередь с двойной блокировкой (также в статье Micheal&Scott 1996) работает так же, как и без блокировки (у меня недостаточно разногласий, так что у заблокированных структур данных есть проблемы с производительностью, но мой стенд слишком мал для сейчас) и параллельная очередь от Intel TBB кажется даже лучше (в два раза быстрее) для относительно небольшого числа (в зависимости от операционной системы, в FreeBSD 9, самой низкой границы, которую я нашел до сих пор, это число составляет 8 потоков на i7 с 4-х ядерным ядром и, следовательно, с 8 логическими процессорами) потоков и имеет очень странное поведение (время выполнения моего простого теста перемещается от секунд до часов!)
Еще одно ограничение для очередей без блокировок, следующих стилю STL: бессмысленно иметь итераторы в очереди без блокировок.
А потом пришли http://www.threadingbuildingblocks.org/. И какое-то время это было хорошо.
PS: вы ищете concurrent_queue и concurrent_hash_map
Ниже приводится статья Херба Саттера о Очередь без параллельной блокировки без блокировки http://www.drdobbs.com/parallel/writing-a-generalized-concurrent-queue/211601363?pgno=1. Я сделал некоторые изменения, такие как изменение порядка компиляции. Для компиляции этого кода требуется GCC v4.4+.
#include <atomic>
#include <iostream>
using namespace std;
//compile with g++ setting -std=c++0x
#define CACHE_LINE_SIZE 64
template <typename T>
struct LowLockQueue {
private:
struct Node {
Node( T* val ) : value(val), next(nullptr) { }
T* value;
atomic<Node*> next;
char pad[CACHE_LINE_SIZE - sizeof(T*)- sizeof(atomic<Node*>)];
};
char pad0[CACHE_LINE_SIZE];
// for one consumer at a time
Node* first;
char pad1[CACHE_LINE_SIZE
- sizeof(Node*)];
// shared among consumers
atomic<bool> consumerLock;
char pad2[CACHE_LINE_SIZE
- sizeof(atomic<bool>)];
// for one producer at a time
Node* last;
char pad3[CACHE_LINE_SIZE
- sizeof(Node*)];
// shared among producers
atomic<bool> producerLock;
char pad4[CACHE_LINE_SIZE
- sizeof(atomic<bool>)];
public:
LowLockQueue() {
first = last = new Node( nullptr );
producerLock = consumerLock = false;
}
~LowLockQueue() {
while( first != nullptr ) { // release the list
Node* tmp = first;
first = tmp->next;
delete tmp->value; // no-op if null
delete tmp;
}
}
void Produce( const T& t ) {
Node* tmp = new Node( new T(t) );
asm volatile("" ::: "memory"); // prevent compiler reordering
while( producerLock.exchange(true) )
{ } // acquire exclusivity
last->next = tmp; // publish to consumers
last = tmp; // swing last forward
producerLock = false; // release exclusivity
}
bool Consume( T& result ) {
while( consumerLock.exchange(true) )
{ } // acquire exclusivity
Node* theFirst = first;
Node* theNext = first-> next;
if( theNext != nullptr ) { // if queue is nonempty
T* val = theNext->value; // take it out
asm volatile("" ::: "memory"); // prevent compiler reordering
theNext->value = nullptr; // of the Node
first = theNext; // swing first forward
consumerLock = false; // release exclusivity
result = *val; // now copy it back
delete val; // clean up the value
delete theFirst; // and the old dummy
return true; // and report success
}
consumerLock = false; // release exclusivity
return false; // report queue was empty
}
};
int main(int argc, char* argv[])
{
//Instead of this Mambo Jambo one can use pthreads in Linux to test comprehensively
LowLockQueue<int> Q;
Q.Produce(2);
Q.Produce(6);
int a;
Q.Consume(a);
cout<< a << endl;
Q.Consume(a);
cout<< a << endl;
return 0;
}
Насколько мне известно, пока нет такой общедоступной вещи. Одна проблема, которую должен решить разработчик, заключается в том, что вам нужен распределитель памяти без блокировки, который существует, хотя сейчас я не могу найти ссылку.
Я написал это в какой-то момент, вероятно, еще в 2010 году, я уверен, с помощью разных ссылок. Это мультипроизводитель одного потребителя.
template <typename T>
class MPSCLockFreeQueue
{
private:
struct Node
{
Node( T val ) : value(val), next(NULL) { }
T value;
Node* next;
};
Node * Head;
__declspec(align(4)) Node * InsertionPoint; //__declspec(align(4)) forces 32bit alignment this must be changed for 64bit when appropriate.
public:
MPSCLockFreeQueue()
{
InsertionPoint = new Node( T() );
Head = InsertionPoint;
}
~MPSCLockFreeQueue()
{
// release the list
T result;
while( Consume(result) )
{
//The list should be cleaned up before the destructor is called as there is no way to know whether or not to delete the value.
//So we just do our best.
}
}
void Produce( const T& t )
{
Node * node = new Node(t);
Node * oldInsertionPoint = (Node *) InterLockedxChange((volatile void **)&InsertionPoint,node);
oldInsertionPoint->next = node;
}
bool Consume( T& result )
{
if (Head->next)
{
Node * oldHead = Head;
Head = Head->next;
delete oldHead;
result = Head->value;
return true;
}
return false; // else report empty
}
};
Я нашел другое решение, написанное на c: