Атомарные операции для двойного связанного списка без блокировки

Я пишу двухсвязный список без блокировки, основанный на следующих документах:

"Эффективное и надежное восстановление памяти без блокировки на основе подсчета ссылок" Андерс Гиденстам, член IEEE, Марина Папатриантафилу, Хоакан Санделл и Филиппас Цигас

"Запросы без блокировки и двусвязные списки" Håkan Sundell, Philippas Tsigas

На этот вопрос мы можем отложить первую статью.

В этой статье они используют умный способ хранения флага удаления и указателя в слове. (Подробнее здесь)

Псевдокод для этого раздела в статье:

union Link
    : word
    (p,d): {pointer to Node, boolean} 

structure Node
    value: pointer to word
    prev: union Link
    next: union Link

И мой код для псевдокода выше:

template< typename NodeT >
struct LockFreeLink
{
public:
    typedef NodeT NodeType;

private:

protected:
    std::atomic< NodeT* > mPointer;

public:
    bcLockFreeLink()
    {
        std::atomic_init(&mPointer, nullptr);
    }
    ~bcLockFreeLink() {}

    inline NodeType* getNode() const throw()
    {
        return std::atomic_load(&mPointer, std::memory_order_relaxed);
    }
    inline std::atomic< NodeT* >* getAtomicNode() const throw()
    {
        return &mPointer;
    }
};

struct Node : public LockFreeNode
{
    struct Link : protected LockFreeLink< Node >
    {
        static const int dMask = 1;
        static const int ptrMask = ~dMask;

        Link() { } throw()
        Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
        { 
            std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
        }

        Node* pointer() const throw() 
        { 
            return reinterpret_cast<Node*>(
                std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
        }
        bool del() const throw() 
        { 
            return std::atomic_load(&data, std::memory_order_relaxed) & dMask; 
        }
        bool compareAndSwap(const Link& pExpected, const Link& pNew) throw() 
        { 
            Node* lExpected = std::atomic_load(&pExpected.mPointer, std::memory_order_relaxed);
            Node* lNew = std::atomic_load(&pNew.mPointer, std::memory_order_relaxed);

            return std::atomic_compare_exchange_strong_explicit(
                &mPointer,
                &lExpected,
                lNew,
                std::memory_order_relaxed,
                std::memory_order_relaxed); 
        }

        bool operator==(const Link& pOther) throw() 
        { 
            return std::atomic_load(data, std::memory_order_relaxed) == 
                std::atomic_load(pOther.data, std::memory_order_relaxed); 
        }
        bool operator!=(const Link& pOther) throw() 
        { 
            return !operator==(pOther); 
        }
    };

    Link mPrev;
    Link mNext;
    Type mData;

    Node() {};
    Node(const Type& pValue) : mData(pValue) {};
};

В этой статье есть эта функция для установки метки удаления ссылки на true:

procedure SetMark(link: pointer to pointer to Node)
    while true do
       node = *link;
       if node.d = true or CAS(link, node, (node.p, true)) then break;

И мой код для этой функции:

void _setMark(Link* pLink)
{
    while (bcTRUE)
    {
        Link lOld = *pLink;
        if(pLink->del() || pLink->compareAndSwap(lOld, Link(pLink->pointer(), bcTRUE)))
            break;
    }
}

Но моя проблема в compareAndSwap функция, где я должен сравнить и поменять местами три атомные переменные. Информация о проблеме здесь

(На самом деле new переменная в функции сравнения и обмена не важна, потому что она локальна для потока)

Теперь мой вопрос: как я могу написать функцию compareAndSwap для сравнения и замены трех атомных переменных или где я делаю ошибку?

(Извините за длинный вопрос)

Редактировать:

Аналогичная проблема в работе менеджера памяти:

function CompareAndSwapRef(link:pointer to pointer toNode,
old:pointer toNode, new:pointer toNode):boolean
    if CAS(link,old,new) then
        if new=NULL then
            FAA(&new.mmref,1);
            new.mmtrace:=false;
    if old=NULLthen FAA(&old.mmref,-1);
    return true;
return false; 

и здесь я должен сравнить и поменять местами три атомные переменные. (Обратите внимание, что мои аргументы типа Link и я должен сравнить и поменять местами mPointer из Link)

3 ответа

Если вы не можете сделать свои три элемента данных, которые вы сравниваете / заменяете, вписываясь в два элемента размером с указатель, вы не можете сделать это с помощью сравнения и замены (конечно, не на x86, и я не слышал ни о какой другой архитектуре машины, которая есть такая вещь).

Если вы полагаетесь на данные, хранящиеся по адресу, который (по крайней мере) выровнен с четным байтовым адресом, вы можете использовать побитовое ИЛИ для установки младшего бита при удалении элемента. В прошлом люди использовали верхние части адреса для хранения дополнительных данных, но, по крайней мере, в x86-64 это невозможно, так как верхняя часть адреса должна быть "канонической", что означает, что любые биты адреса выше "полезного предела" (определяемого архитектурой процессора, в настоящее время это 48 битов), все они должны совпадать с максимальным битом допустимого предела (так же, как и бит 47).

Редактировать: этот раздел кода делает именно то, что я описываю:

    static const int dMask = 1;
    static const int ptrMask = ~dMask;

    Link() { } throw()
    Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
    { 
        std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
    }

    Node* pointer() const throw() 
    { 
        return reinterpret_cast<Node*>(
            std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
    }

Он использует младший бит для хранения pDel флаг.

Вы должны быть в состоянии сделать это для двойного связанного списка, используя форму cmpxchg16b (на x86). В системе Windows это было бы _InterlockedCompareExchange128, В gcc (для ОС типа Unix, такой как Linux/MacOS) вам необходимо сначала создать int128 из ваших двух указателей. Если вы компилируете для 32-битного кода, вам, вероятно, потребуется сделать 64-битный int как для Windows, так и для Unix.

На x64 используется только 44 бита адресного пространства. Если ваши указатели выровнены до 8 байтов, то вы используете только 41 бит. 41x2 все еще слишком велик для 64 бит. Есть 128-битное сравнение и своп, хотя я не могу ручаться за его скорость. Я всегда стараюсь использовать 64-битную.

Может быть, вам нужно только до 2 миллиардов узлов. Итак, что вы можете сделать, это предварительно выделить пул узлов, из которых извлекается список. Вы создаете узлы, захватывая следующий индекс свободного пула, используя атомарные операции. Тогда вместо указателей next и prev, это могут быть 31-битные индексы в пуле узлов, и у вас останется 2 бита для флагов удаления. Предполагая, что вам не нужно 2 миллиарда узлов, у вас осталось еще больше битов. Единственным недостатком является то, что вы должны знать, сколько узлов вам понадобится при запуске, хотя вы могли бы перераспределить узлы, если бы у вас было тоже.

Я использовал функции виртуальной памяти, чтобы зарезервировать ГБ адресного пространства, а затем отобразить физический ОЗУ в это пространство, так как мне это нужно для расширения пула без необходимости перераспределения.

http://www.drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/210600279

Но замена замков оптом путем написания собственного кода без блокировки - это не решение. Код без блокировки имеет два основных недостатка. Во-первых, он не очень полезен для решения типичных проблем - многие базовые структуры данных, даже двусвязные списки, до сих пор не имеют известных реализаций без блокировок. Использование новой или улучшенной структуры данных без блокировок все равно принесет вам по крайней мере опубликованную статью в рецензируемом журнале, а иногда и степень.

Я не думаю, что было бы достаточно эффективно использовать его, но в любом случае это интересно читать.

Другие вопросы по тегам