Идея реализации Lock Free stack - в настоящее время не работает

У меня возникла идея, которую я пытаюсь реализовать для стека без блокировки, который не использует подсчет ссылок для решения проблемы ABA, а также правильно обрабатывает восстановление памяти. По своей концепции он похож на RCU и опирается на две функции: пометка записи списка как удаленной и отслеживание читателей, пересекающих список. Первый прост, он просто использует младший бит указателя. Последнее - моя "умная" попытка подхода к реализации неограниченного стека без блокировки.

По сути, когда какой-либо поток пытается пройти по списку, увеличивается один атомарный счетчик (list.entries). Когда обход завершен, увеличивается второй счетчик (list.exits).

Распределение узлов обрабатывается push, а освобождение - pop.

Операции push и pop довольно похожи на простую реализацию стека без блокировки, но узлы, помеченные для удаления, должны быть пройдены, чтобы получить запись без пометки. Поэтому Push в основном очень похож на вставку связанного списка.

Операция pop аналогично проходит по списку, но использует atomic_fetch_or, чтобы пометить узлы как удаленные при обходе, пока не достигнет неотмеченного узла.

Пройдя по списку из 0 или более отмеченных узлов, поток, который выталкивается, попытается CAS статьей стека. По крайней мере, один поток одновременно выталкивает поток, и после этого все читатели, входящие в стек, больше не будут видеть ранее помеченные узлы.

Поток, который успешно обновляет список, затем загружает atomic list.entries, и в основном спин-загружает atomic.exits, пока этот счетчик не превысит list.entries. Это должно подразумевать, что все читатели "старой" версии списка закончили. Затем поток просто освобождает список отмеченных узлов, которые он поменял с верхней части списка.

Таким образом, следствием операции pop должно быть (я думаю), что не может быть проблемы ABA, поскольку освобожденные узлы не возвращаются в пул полезных указателей до тех пор, пока не завершатся все одновременные читатели, использующие их, и, очевидно, проблема восстановления памяти обрабатывается также, по той же причине.

Так или иначе, это теория, но я все еще ломаю голову над реализацией, потому что в настоящее время она не работает (в многопоточном случае). Похоже, что я получаю некоторые записи после бесплатных проблем среди других вещей, но у меня возникают проблемы с обнаружением проблемы, или, возможно, мои предположения ошибочны, и это просто не будет работать.

Буду признателен за любые идеи, как по концепции, так и по подходам к отладке кода.

Вот мой текущий (неработающий) код (скомпилируйте с помощью gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}

1 ответ

Решение

Вы упоминаете, что пытаетесь решить проблему ABA, но описание и код на самом деле являются попыткой решить более сложную проблему: проблему восстановления памяти.

Эта проблема обычно возникает в функциональности "удаления" коллекций без блокировок, реализованных на языках без сборки мусора. Основная проблема заключается в том, что поток, удаляющий узел из общей структуры, часто не знает, когда безопасно освободить удаленный узел, поскольку другие чтения могут по-прежнему иметь ссылку на него. Решение этой проблемы часто, как побочный эффект, также решает проблему ABA: речь идет о успешной операции CAS, даже если базовый указатель (и состояние объекта) за это время был изменен по крайней мере дважды, заканчиваясь первоначальное значение, но представляет совершенно другое состояние.

Проблема ABA проще в том смысле, что существует несколько простых решений проблемы ABA, в частности, которые не приводят к решению проблемы "восстановления памяти". Также проще в том смысле, что аппаратное обеспечение, которое может обнаружить модификацию местоположения, например, с помощью LL/SC или примитивов транзакционной памяти, может вообще не показывать проблему.

Тем не менее, вы ищете решение проблемы восстановления памяти, и это также позволит избежать проблемы ABA.

Суть вашей проблемы заключается в следующем утверждении:

Поток, который успешно обновляет список, затем загружает atomic list.entries, и в основном спин-загружает atomic.exits, пока этот счетчик не превысит list.entries. Это должно подразумевать, что все читатели "старой" версии списка закончили. Затем поток просто освобождает список отмеченных узлов, которые он поменял с верхней части списка.

Эта логика не работает. В ожидании list.exits (вы говорите atomic.exits, но я думаю, что это опечатка, так как вы говорите только о list.exits в другом месте) быть больше, чем list.entries только говорит вам, что теперь было больше общего числа выходов, чем было записей в тот момент, когда поток изменений захватил количество записей. Однако эти выходы могли быть вызваны приходом и уходом новых читателей: это вовсе не означает, что все старые читатели закончили, как вы утверждаете!

Вот простой пример. Сначала написание темы T1 и нить для чтения T2 получить доступ к списку в то же время, так list.entries это 2 и list.exits равно 0. Поток записи извлекает узел и сохраняет текущее значение (2) из list.entries и ждет lists.exits быть больше 2. Теперь еще три темы чтения, T3, T4, T5 прибыть и быстро прочитать список и уйти. Сейчас lists.exits 3, и ваше условие выполнено и T1 освобождает узел. T2 никуда не делся и взрывается, так как читает освобожденный узел!

Основная идея, которая у вас есть, может работать, но, в частности, ваш двухсторонний подход определенно не работает.

Это хорошо изученная проблема, поэтому вам не нужно изобретать собственный алгоритм (см. Ссылку выше) или даже писать собственный код, поскольку такие вещи, как librcu и concurrencykit уже существуют.

Для образовательных целей

Если вы хотите сделать эту работу в образовательных целях, один из подходов состоит в том, чтобы убедиться, что потоки, входящие после начала модификации, используют другой набор list.entry/exit счетчики. Один из способов сделать это - счетчик генерации, и, когда автор хочет изменить список, он увеличивает счетчик генерации, что заставляет новых читателей переключаться на другой набор. list.entry/exit счетчики.

Теперь писателю остается только ждать list.entry[old] == list.exists[old] Это означает, что все старые читатели ушли. Вы могли бы также просто уйти с одним счетчиком на поколение: вы на самом деле не два entry/exit счетчики (хотя это может помочь уменьшить раздор).

Конечно, вы знаете, есть новая проблема управления этим списком отдельных счетчиков для поколения... который выглядит как первоначальная проблема создания списка без блокировок! Эта проблема немного проще, хотя, потому что вы можете установить разумные ограничения на количество поколений "в полете" и просто распределить их все заранее, или вы можете реализовать ограниченный тип списка без блокировок, который легче рассуждать потому что добавления и удаления происходят только в голове или хвосте.

Другие вопросы по тегам