Многопоточная одиночная читательская очередь fifo
Мне нужна очередь для передачи сообщений из одного потока (A) в другой (B), однако я не смог найти тот, который действительно делает то, что я хочу, так как они обычно позволяют добавить элемент, чтобы потерпеть неудачу, случай, который в моей ситуации в значительной степени фатально, так как сообщение должно быть обработано, и поток действительно не может остановиться и ждать свободного места.
- Только поток A добавляет элементы, и только поток B читает их
- Поток A никогда не должен блокироваться, однако поток B не является критичным для производительности, поэтому он может
- Добавление элементов всегда должно выполняться успешно, поэтому очередь не может иметь верхнего предела размера (за исключением нехватки памяти в системе)
- Если очередь пуста, поток B должен подождать, пока не появится элемент для обработки
4 ответа
Вот как написать очередь без блокировки в C++:
http://www.ddj.com/hpc-high-performance-computing/210604448
Но когда вы говорите "поток А не должен блокировать", вы уверены, что это требование? Windows не является операционной системой реального времени (как и Linux при обычном использовании). Если вы хотите, чтобы поток А мог использовать всю доступную системную память, он должен выделить память (или подождать, пока кто-то другой это сделает). Сама ОС не может обеспечить гарантии синхронизации лучше, чем те, которые вы имели бы, если бы и читатель, и писатель взяли на себя блокировку в процессе (т. Е. Неразделенный мьютекс) для манипулирования списком. И худший случай добавления сообщения - пойти в ОС, чтобы получить память.
Короче говоря, есть причина, по которой те очереди, которые вам не нравятся, имеют фиксированную емкость - они не должны выделять память в предположительно низко-латентном потоке.
Таким образом, код без блокировок, как правило, будет менее блочным, но из-за выделения памяти это не гарантируется, и производительность с мьютексом не должна быть такой уж плохой, если у вас нет действительно огромного потока событий для процесс (например, вы пишете сетевой драйвер и сообщения являются входящими пакетами Ethernet).
Итак, в псевдокоде первое, что я попробую, будет:
Writer:
allocate message and fill it in
acquire lock
append node to intrusive list
signal condition variable
release lock
Reader:
for(;;)
acquire lock
for(;;)
if there's a node
remove it
break
else
wait on condition variable
endif
endfor
release lock
process message
free message
endfor
Только если это приведет к недопустимым задержкам в потоке писателя, я перейду к коду без блокировки (если только у меня не оказалось подходящей очереди).
Почему бы не использовать STL <
list
> или <deque
> с мьютексом вокруг добавить / удалить? Потокобезопасность STL недостаточна?Почему бы не создать свой собственный (поодиночке / дважды) класс связанного узла-списка, который содержит указатель, и чтобы элементы, которые должны быть добавлены / удалены, наследовали от этого? Таким образом, делая дополнительное распределение ненужным. Вы просто пометите несколько указателей в
threadA::add()
а такжеthreadB::remove()
и вы сделали. (Хотя вы захотите сделать это под мьютексом, эффект блокировки на threadA будет незначительным, если вы не сделаете что-то действительно неправильное...)Если вы используете pthreads, проверьте
sem_post()
а такжеsem_wait()
, Идея состоит в том, что поток B может блокировать бесконечно черезsem_wait()
пока threadA не поместит что-то в очередь. Затем ThreadA вызываетsem_post()
, Который пробуждает threadB, чтобы сделать это работа. После чего поток B может вернуться в режим сна. Это эффективный способ обработки асинхронной сигнализации, поддерживающий такие вещи, как множественныеthreadA::add()
передthreadB::remove()
завершается.
В Visual Studio 2010 добавлены 2 новые библиотеки, которые очень хорошо поддерживают этот сценарий: библиотека асинхронных агентов и библиотека параллельных шаблонов.
Библиотека агентов имеет поддержку или асинхронную передачу сообщений и содержит блоки сообщений для отправки сообщений в "цели" и для получения сообщений из "источников"
Unbounded_buffer - это шаблонный класс, который предлагает то, что, я полагаю, вы ищете:
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace ::Concurrency;
using namespace ::std;
int main()
{
//to hold our messages, the buffer is unbounded...
unbounded_buffer<int> buf1;
task_group tasks;
//thread 1 sends messages to the unbounded_buffer
//without blocking
tasks.run([&buf1](){
for(int i = 0 ; i < 10000; ++i)
send(&buf1,i)
//signal exit
send(&buf1,-1);
});
//thread 2 receives messages and blocks if there are none
tasks.run([&buf1](){
int result;
while(result = receive(&buf1)!=-1)
{
cout << "I got a " << result << endl;
}
});
//wait for the threads to end
tasks.wait();
}
Возможно, вы захотите рассмотреть ваши требования - действительно ли это так, что А не может вообще отказаться от каких-либо элементов очереди? Или вы не хотите, чтобы B вытаскивал из очереди два последовательных элемента, из которых не поступали последовательные элементы, потому что это каким-то образом искажало бы последовательность событий?
Например, если это какая-то система регистрации данных, вы (по понятным причинам) не захотите пропусков в записи - но без неограниченной памяти реальность такова, что в каком-то угловом случае где-то вы, вероятно, могли бы переполнить свою очередь.,
В этом случае одно из решений состоит в том, чтобы иметь какой-то особый элемент, который можно поместить в очередь, что представляет собой случай, когда А обнаруживает, что ему нужно отбросить предметы. По сути, вы сохраняете один дополнительный элемент, который в большинстве случаев равен нулю. Каждый раз, когда A идет, чтобы добавить элементы в очередь, если этот дополнительный элемент не является нулевым, это входит. Если A обнаруживает, что в очереди нет места, то он настраивает этот дополнительный элемент, чтобы сказать "эй, очередь была заполнена",
Таким образом, A никогда не блокируется, вы можете отбрасывать элементы, когда система очень занята, но вы не упускаете из виду тот факт, что элементы были отброшены, потому что, как только пространство очереди становится доступным, эта отметка указывает, где данные падение произошло. Затем процесс B делает все, что ему нужно, когда он обнаруживает, что он вытянул этот элемент метки переполнения из очереди.