Очередь без блокировки
Также я делаю c
реализации и в настоящее время имеют структуру очереди:
typedef struct queueelem {
queuedata_t data;
struct queueelem *next;
} queueelem_t;
typedef struct queue {
int capacity;
int size;
queueelem_t *head;
queueelem_t *tail;
} queue_t;
queue_t *
queue_init(int capacity)
{
queue_t *q = (queue_t *) malloc(sizeof(queue_t));
q->head = q->tail = NULL;
q->size = 0;
q->capacity = capacity;
return q;
}
int CompareAndExchange (void **a, void *comparand,void *new) {
int success = 0;
pthread_mutex_lock(&CE_MUTEX);
if ((*a) != comparand) {
(*a) = new;
//return TRUE
success = 1;
}
pthread_mutex_unlock(&CE_MUTEX);
//return FALSE
return success;
}
Но не уверен, как продолжить, с функциями очереди и очереди...
- Как будет выглядеть код?
4 ответа
Ваш псевдокод может (и, скорее всего, имеет) страдать от проблемы ABA, так как проверяется только указатель, а не сопровождающий уникальный штамп, вы найдете эту статью полезной в этом отношении и в качестве общего руководства по блокировке. реализация свободной очереди с ее подводными камнями.
Когда имеешь дело с программированием без блокировок, полезно также ознакомиться с работами Херба Саттера, поскольку он дает хорошие, проницательные объяснения того, что требуется, почему это требуется и его потенциальные слабые места (хотя следует помнить, что некоторые из его более старых публикаций / статей где обнаружены некоторые скрытые / непредвиденные проблемы).
Некоторое время назад я нашел хорошее решение этой проблемы. Я считаю, что это самый маленький из найденных до сих пор.
В репозитории есть пример того, как использовать его для создания N потоков (читателей и писателей), а затем заставить их совместно использовать одно рабочее место.
Я провел несколько тестов на тестовом примере и получил следующие результаты (в миллионах операций в секунду):
По размеру буфера
По количеству потоков
Обратите внимание, как количество потоков не влияет на пропускную способность.
Думаю, это окончательное решение этой проблемы. Он работает невероятно быстро и просто. Даже с сотнями потоков и очередью на одну позицию. Его можно использовать как конвейер между потоками, выделяя пространство внутри очереди.
В репозитории есть несколько ранних версий, написанных на C# и паскале. Я работаю над тем, чтобы сделать что-то более совершенное, чтобы показать его настоящие возможности.
Я надеюсь, что некоторые из вас смогут подтвердить работу или помочь с некоторыми идеями. Или, по крайней мере, можно его сломать?
А также недавний разговор на эту тему: https://github.com/boostcon/2011_presentations/raw/master/wed/lockfree_2011_slides.pdf
Вы можете попробовать эту библиотеку, она встроена в c native. lfqueue
Например
int* int_data;
lfqueue_t my_queue;
if (lfqueue_init(&my_queue) == -1)
return -1;
/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&my_queue, int_data) == -1) {
printf("ENQ Full ?\n");
}
/** Wrap This scope in other threads **/
/*Dequeue*/
while ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
printf("DEQ EMPTY ..\n");
}
// printf("%d\n", *(int*) int_data );
free(int_data);
/** End **/
lfqueue_destroy(&my_queue);
(Оставив это здесь пока, но см. Редактировать.)
Знаете ли вы реализацию блокировки без очереди в C?
Недавно я написал очередь без блокировки ( http://www.ideone.com/l2QRp). На самом деле я не могу гарантировать, что он работает правильно, но я не могу найти никаких ошибок, и я использовал его в нескольких однопоточных программах без проблем, так что в этом нет ничего слишком очевидного.
Тривиальный пример использования:
queue_t queue;
int val = 42;
queue_init(&queue,sizeof val);
queue_put(&queue,&val);
val = 0;
queue_pop(&queue,&val);
printf("%i\n",val); // 42
queue_destroy(&queue);
Редактировать:
Как отметил @ Алексей Куканов, queue_pop может потерпеть неудачу, если tmp
выталкивается, освобождается, снова выделяется и снова помещается между проверкой на нулевое значение и обменом:
if(!tmp->next) return errno = ENODATA;
/* can fail here */
} while(!sync_swap(q->head,tmp,tmp->next));
Я еще не уверен, как это исправить, но я (надеюсь) обновлю это, как только выясню это. Пока не обращайте на это внимания.