Безблокировочные структуры данных C++, невозможно?

Я действительно не понимаю, как вы можете сделать некоторые структуры данных без блокировки. Например, если у вас есть связанный список, то вы либо окружаете операции мьютексами, ИЛИ вы можете получить условие гонки, если другой поток выполняется, пока вы заняты повторным связыванием новых узлов вместе.

Концепция "без блокировки" (я понимаю, что это не означает "без блокировок", но означает, что потоки могут развиваться, не дожидаясь завершения других потоков), просто не имеет смысла.

Может кто-нибудь показать мне простой пример с использованием стека, очереди или связанного списка и т. Д., Который реализован как "без блокировки", потому что я не могу понять, как можно предотвратить состояние гонки, не влияя на производительность других потоков? Наверняка эти две цели противоречат друг другу?

7 ответов

Структуры данных без блокировки используют атомарные операции и могут налагать дополнительные требования. Например, структура данных может быть безопасной только для одного читающего и одного записывающего потока или любой другой комбинации. В случае простого связанного списка будут использоваться атомарные операции чтения и записи в указатель узла, чтобы гарантировать, что несколько потоков могут безопасно читать и записывать в него одновременно.

Вы можете или не можете сойти с рук только с этим. Если вам нужны дополнительные гарантии относительно содержимого структуры данных и проверки, вы, вероятно, не сможете сделать это без какой-либо формы высокоуровневой блокировки. Кроме того, не каждая структура данных позволяет переписать ее так, чтобы она была свободной от блокировки, даже если принять во внимание дополнительные требования к тому, как используется структура данных. В этом случае неизменные объекты могут быть решением, но они обычно сопровождаются снижением производительности из-за копирования, что не всегда желательно из-за блокировки объекта и последующего его изменения.

Что я нахожу простым и объяснимым, так это то, что сначала вы можете написать псевдокод для структуры данных в стиле блокировки (мьютекса), а затем попытаться увидеть, как переменные, на которых вы удерживали блокировку, могут быть изменены без блокировки с помощью операций CAS. Хотя другие дали отличные ответы, я хотел бы добавить, что вы почувствуете это, только если реализуете самостоятельно, конечно, прочитав некоторый псевдокод из какой-то исследовательской статьи, в которой он был опубликован.

Вот очередь, которую я реализовал на С++ с проверочным тестированием для многопоточных запусков:

      #include<iostream>
#include<atomic>
#include<thread>
#include<vector>
#define N 1000
using namespace std;
class lf_queue
{
private:
    struct node
    {   int data;
        atomic<node*> next;
        node(int d):data(d)
        {}
    };
    atomic<node*> Head;
    atomic<node*> Tail;
public:
    lf_queue()
    {
        node *nnode= new node(-1);
        nnode->next=NULL;
        Head=nnode;
        Tail=nnode;
    }
    void enqueue(int data)
    {
        node *nnode= new node(data);
        nnode->next=NULL;
        node *tail,*next_p;
        while(true)
        {
            tail=Tail.load();
            next_p=tail->next;
            if(tail==Tail.load())
            {
                if(next_p==NULL)
                {
                    if((tail->next).compare_exchange_weak(next_p,nnode))
                    break;
                }
                else
                {
                    Tail.compare_exchange_weak(tail,next_p);
                }
            }
        }
        Tail.compare_exchange_weak(tail,nnode);
    }
    bool dequeue(int &res)
    {
        while(true)
        {
            node *head,*tail,*next_p;
            head=Head.load();
            tail=Tail.load();
            next_p=head->next;
            if(head==Head.load())
            {
                if(head==tail)
                {
                    if(next_p==NULL)
                        return false;
                    Tail.compare_exchange_weak(tail,next_p);
                }
                else
                {
                    res=next_p->data;
                    if(Head.compare_exchange_weak(head,next_p))
                        break;
                }
            }
        }//end loop
        return true;
    }
};
void producer(lf_queue &q)
{   //cout<<this_thread::get_id()<<"Inside producer\n";
    for(int i=0;i<N;i++)
    {
       q.enqueue(1);
     }
    //cout<<this_thread::get_id()<<" "<<"Finished producing\n";
}
void consumer(lf_queue &q,atomic<int>& sum)
{   //cout<<this_thread::get_id()<<" "<<"Inside consumer\n";
    for(int i=0;i<N;i++)
    {
        int res=0;
        while(!q.dequeue(res));
        sum+=res;
    }
    //cout<<this_thread::get_id()<<" "<<"Finished consuming\n";
}
int main()
{
    lf_queue Q;
    atomic<int> sum;
    sum.store(0);
    vector<thread> thread_pool;
    for(int i=0;i<10;i++)
    {   if(i%2==0)
        {   thread t(consumer,ref(Q),ref(sum));
            thread_pool.push_back(move(t));
        }
        else
        {
            thread t(producer,ref(Q));
            thread_pool.push_back(move(t));    
        }
    }
    for(int i=0;i<thread_pool.size();i++)
    thread_pool[i].join();
    cout<<"Final sum "<<sum.load()<<"\n";
    return 0;
}

Я попытался реализовать блокировку связанного списка без блокировки, используя статью Харриса, но столкнулся с осложнениями, вы видите со стилем С++ 11, вы можете выполнять CAS только для типов atomic<>, а также эти atomic<node*> не могут быть преобразованы в long с целью кражи битов, которую реализация Харриса использует для логической маркировки удаленных узлов. Однако в Интернете есть реализации кода на C, которые используют низкоуровневые операции cas_ptr, что дает большую гибкость для приведения к / от между адресами и long.

Доступны различные примитивы, которые позволяют создавать такие структуры данных без блокировки. Например, сравните-и-своп (CAS для краткости), который атомарно выполняет следующий код:

CAS(x, o, n)
  if x == o:
    x = n
    return o
  else:
    return x

С помощью этой операции вы можете делать атомарные обновления. Рассмотрим, например, очень простой связанный список, который хранит элементы в отсортированном порядке, позволяет вставлять новые элементы и проверять, существует ли элемент уже. Операция find будет работать как прежде: она будет проходить по всем ссылкам, пока не найдет элемент или не найдет элемент большего размера, чем запрос. Вставка должна быть немного более осторожной. Это может работать следующим образом:

insert(lst, x)
  xn = new-node(x)
  n = lst.head
  while True:
    n = find-before(n, x)
    xn.next = next = n.next
    if CAS(n.next, next, x) == next:
      break

find-before(n,x) просто находит элемент, который предшествует x в порядке. Это, конечно, просто набросок. Все становится сложнее, если вы хотите поддерживать удаление. Я рекомендую Херлихи и Шавита "Искусство многопроцессорного программирования". Следует также отметить, что зачастую выгодно переключать структуры данных, которые реализуют одну и ту же модель, чтобы сделать их свободными от блокировок. Например, если вы хотите реализовать эквивалент std::mapбыло бы больно делать это с красно-черным деревом, но список пропусков гораздо более управляемый.

Структура без блокировки использует атомарную инструкцию для получения права собственности на ресурсы. Атомарные инструкции блокируют переменную, которая работает на уровне кэша ЦП, и уверяют вас, что другие ядра не могут мешать работе.

Допустим, у вас есть эти атомарные инструкции:

  • читать (A) -> A
  • сравнивать и поменять (A, B, C) -> oldA = A; if (A == B) { A = C }; вернуть oldA;

С помощью этой инструкции вы можете просто создать стек:

template<typename T, size_t SIZE>
struct LocklessStack
{
public:
  LocklessStack() : top(0)
  {
  }
  void push(const T& a)
  {
     int slot;
     do
     {
       do
       {
         slot = read(top);
         if (slot == SIZE)
         {
           throw Stackru();
         }
       }while(compare_and_swap(top, slot, slot+1) == slot);
       // NOTE: If this thread stop here. Another thread pop and push
       //       a value, this thread will overwrite that value [ABA Problem].
       //       This solution is for illustrative porpoise only
       data[slot] = a;
     }while( compare_and_swap(top, slot, slot+1) == slot );
  }
  T pop()
  {
     int slot;
     T temp;
     do
     {
       slot = read(top);
       if (slot == 0)
       {
         throw StackUnderflow();
       }
       temp = data[slot-1];
     }while(compare_and_swap(top, slot, slot-1) == slot);
     return temp;
  }
private:
  volatile int top;
  T data[SIZE];
};

требуется volatile, чтобы компилятор не нарушал порядок операций во время оптимизации. Два одновременных нажатия происходят:

Первый ввод в цикл while и чтение слота, затем прибывает второй толчок, чтение сверху, успешное сравнение и обмен (CAS) и увеличение приращения. Другой поток просыпается, CAS терпит неудачу и читает другое время сверху.

Происходят два одновременных щелчка:

Действительно похоже на предыдущий случай. Нужно также прочитать значение.

Один поп и пуш происходят одновременно:

pop прочитайте верхнюю часть, прочитайте temp.. нажмите ввод и измените верхнюю часть и нажмите новое значение. Ошибка Pop CAS, pop() снова выполнит цикл и прочитает новое значение

или же

нажмите читать верхнюю часть и получите слот, нажмите ввод и измените верхнее значение. Нажать CAS не удастся, и приходится повторять цикл, нажимая на более низкий индекс.

Очевидно, что это не так в параллельной среде

stack.push(A);
B = stack.pop();
assert(A == B); // may fail

причина, в то время как толчок атомарен, и поп атомарен, комбинация их не атомарна

Первая глава игрового программирования gem 6 - хороший справочник.

Обратите внимание, что код НЕ ПРОВЕРЕН, и атомарный может быть очень неприятным.

Вот, пожалуйста, очень простой (push_front) список без блокировок:

      template <class T>
class LockFreeList {    
public:
    struct Node {
        T value_;
        Node* next_;
        Node(const T& value) : value_(value), next_(nullptr) {
        }
    };
    
    void push_front(const T& value) {
        Node* new_node = new Node(value);
        Node* old_head = head_;
        do { new_node->next_ = old_head; }
        while (!head_.compare_exchange_weak(old_head, new_node));
    }
    
private:
    std::atomic<Node*> head_;
}; 

Вдохновленный выступлением Федора Пикуса на CppCon 2017, см.: https://youtu.be/ZQFzMfHIxng?t=2432

С небольшим изменением: push_back использует compare_exchange_weak.

Ваше определение блокировки свободы неверно.

Блокировка свободы позволяет отдельным потокам голодать, но гарантирует пропускную способность всей системы. Алгоритм не блокируется, если он удовлетворяет тому, что когда программные потоки выполняются достаточно долго, по крайней мере, один из потоков делает прогресс (для некоторого разумного определения прогресса) https://en.wikipedia.org/wiki/Non-blocking_algorithm

это означает, что при доступе нескольких структур к структуре данных будет предоставлен только 1; остальное не получится

важная вещь о свободе блокировки - вероятность столкновения памяти. Структура данных, защищенная блокировками, как правило, будет быстрее, чем реализация с атомарными переменными, но она плохо масштабируется с небольшой вероятностью столкновения.

пример: несколько потоков постоянно push_back данных в вашем списке. это приведет ко многим столкновениям, и классический мьютекс в порядке. Однако если у вас есть 1 поток, выдвигающий данные в конец списка, и 1 поток, выдвигающий данные в начало, ситуация будет иной. Если список не пустой, push_back() и pop_front() не будут сталкиваться (зависит от реализации), поскольку они не работают с одним и тем же объектом. Но все еще есть изменение пустого списка, поэтому вам все еще нужно защитить доступ. В этом случае блокировка-свобода будет лучшим решением, так как вы можете вызывать обе функции одновременно без ожидания.

короче говоря: без блокировки предназначен для больших структур данных, где несколько авторов в основном разделены и редко сталкиваются.

Некоторое время назад я пытался реализовать контейнер списка без блокировки... https://codereview.stackexchange.com/questions/123201/lock-free-list-in-c

Предположим, простая операция, которая увеличивает переменную на единицу. Если вы реализуете это, используя "чтение переменной из памяти в процессор, добавление 1 в регистр процессора, запись переменной обратно", то вам придется использовать какой-то мьютекс вокруг всего этого, потому что вы хотите убедиться, что второй поток не будет читать переменную до тех пор, пока первая не запишет ее обратно.

Если ваш процессор имеет атомарную инструкцию по сборке "места увеличения памяти", блокировка вам не нужна.

Или предположим, что вы хотите вставить элемент в связанный список, что означает, что вам нужно сделать указатель начала на новый элемент, а затем указать новый элемент на элемент, который был предыдущим первым. С помощью атомарной операции "обмен двух ячеек памяти" вы можете записать текущий указатель начала в "следующий" указатель нового элемента, а затем поменять местами два указателя - теперь, в зависимости от того, какой поток запускается первым, элементы будут в разных порядок в списке, но структура данных списка остается неизменной.

По сути, это всегда "делать несколько вещей одновременно, за одну атомарную операцию, чтобы вы не могли разбить операцию на отдельные части, которые не могут быть прерваны".

Другие вопросы по тегам