Реализация многопользовательского буфера pthread одного производителя с использованием условных переменных
Я пишу наивную реализацию буфера с несколькими потребителями одного производителя с pthreads и условными переменными, используя список C++ в качестве буфера. Меня не слишком беспокоит скорость выполнения моего кода, я просто хочу избавиться от ошибок.
Поток производителя читает строку из файла и вставляет ее в конец буфера, в то время как каждый из потребителей читает с самого начала и помещает ее в свою матрицу. Итак, по сути, у меня есть очередь FIFO, которая имеет максимальный размер, и первый элемент может быть удален только тогда, когда все потребители уже прочитали его.
Вот важная часть трех функций моего кода:
РЕЖИССЕР:
void *feedBuffer(void *threadproducer){
//some declarations...
while(!file->eof())
{
pthread_mutex_lock(&mutex);
while(*buffer_current_size == buffer_max_size) { // full
// wait until some elements are consumed
pthread_cond_wait(&can_produce, &mutex);
}
pthread_mutex_lock(&lock_buffer);
*file >> temp.word;
buffer->push_back(temp);
(*buffer_current_size)++;
pthread_mutex_unlock(&lock_buffer);
pthread_cond_broadcast(&can_consume);
pthread_mutex_unlock(&mutex);
}
file->close();
pthread_cond_broadcast(&can_consume);
pthread_mutex_lock(&lock_buffer);
buffer_current_size->store(-1); //END OF READ SIGNAL
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}
БУФЕРНЫЙ КОНТРОЛЛЕР И РАБОЧИЙ РЕЗЬБЕР:
void *main_consumer(void *threadconsumer) //consumer caller and buffer controll
{
//some declarations...
for(int j=0; j<NUMTHREADS; j++)
{
pthread_create(&threads[j],&attr,worker,(void *) &workerargs[j]);
}
//BUFFER CONTROLLER
pthread_mutex_lock(&lock_buffer);
while(*buffer_current_size!=-1){ //WHILE READ HASN'T ENDED
pthread_mutex_unlock(&lock_buffer); //UNLOCK AND LOCK AGAIN TO LET OTHER THREADS HOLD THE LOCK FOR A WHILE
pthread_mutex_lock(&lock_buffer);
it=buffer->begin(); //GET FIRST ELEMENT OF THE BUFFER
if(it->cnt == NUMTHREADS){
buffer->pop_front(); //DELETE FIRST ELEMENT
(*buffer_current_size)--; //DECREASE SIZE
pthread_cond_signal(&can_produce); //PRODUCER CAN PRODUCE
}
}
pthread_mutex_unlock(&lock_buffer);
for(int i=0; i<NUMTHREADS; i++)
{
pthread_join(threads[i],NULL);
}
}
РАБОЧИЙ:
void *worker(void *threadwoker)
{
//some declarations...
pthread_mutex_lock(&lock_buffer); //LOCK TO GET BEGIN
it=buffer->begin();
while(!(*buffer_current_size==-1 && it==args->buffer->end())) {
pthread_mutex_unlock(&lock_buffer);
//insert into matrix...
pthread_mutex_lock(&lock_buffer); //UNIFIED LOCK FOR IT AND CNT, SOLVING ISSUE
(it->cnt)++;
it++;
pthread_mutex_unlock(&lock_buffer);
pthread_mutex_lock(&mutex);
while (*buffer_current_size==0) { //WAIT IF BUFFER EMPTY
pthread_cond_wait(&can_consume, &mutex);
}
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&lock_buffer); //LOCKING FOR WHILE ARGUMENTS
}
pthread_mutex_unlock(&lock_buffer);
pthread_exit(NULL);
}
Как вы можете видеть, я использовал счетчик int для каждого элемента буфера, чтобы проверить, все ли рабочие потоки его уже прочитали. Когда это условие становится истинным, контроллер буфера удаляет первый элемент из очереди. Все ограничено замками, чтобы гарантировать целостность данных.
Проблема в том, что этот код не работает, я либо получаю ошибку сегмента, либо ошибку мьютекса. Может кто-нибудь просветить с какими-либо идеями, почему?
1 ответ
Во-первых, неясно, какие именно структуры данных защищены каждым мьютексом. Я предлагаю, чтобы, по крайней мере, для первоначальной реализации вы упростили до одного мьютекса, защищающего все разделяемое состояние - это счетчик размера буфера, сам буфер и счетчик в рабочих элементах.
Что касается конкретных вопросов:
- Производитель должен повторно проверить состояние после
pthread_cond_wait()
(это должно бытьwhile ()
петля, а неif ()
заявление); - когда продюсер заканчивает, он получает доступ
*buffer_current_size
без блокировки и не сигнализирует ожидающим потребителям; - Доступ к контроллеру буфера
*buffer_current_size
без блокировки удерживается; - Рабочий доступ
buffer->begin()
а такжеbuffer->end()
без блокировки удерживается; - Рабочий доступ
*buffer_current_size
без блокировки удерживается; - Рабочий звонит
pthread_mutex_trylock(&mutex)
без проверки результата, что означает, что он может получить доступ к общему состоянию и разблокировать мьютекс без блокировки; - Работник должен перепроверить состояние ожидания после вызова
pthread_cond_wait()
; - Работник обращается к итератору
it
без удержания блокировки, что проблематично из-за того, что другие потоки модифицируют базовый::list
- поскольку этот поток уже увеличил счетчик, контроллер буфера мог уже удалить элемент, на который указывает итератор, что означает, что вы не можете увеличить итератор.