Многопоточная одиночная читательская очередь fifo

Мне нужна очередь для передачи сообщений из одного потока (A) в другой (B), однако я не смог найти тот, который действительно делает то, что я хочу, так как они обычно позволяют добавить элемент, чтобы потерпеть неудачу, случай, который в моей ситуации в значительной степени фатально, так как сообщение должно быть обработано, и поток действительно не может остановиться и ждать свободного места.

  • Только поток A добавляет элементы, и только поток B читает их
  • Поток A никогда не должен блокироваться, однако поток B не является критичным для производительности, поэтому он может
  • Добавление элементов всегда должно выполняться успешно, поэтому очередь не может иметь верхнего предела размера (за исключением нехватки памяти в системе)
  • Если очередь пуста, поток B должен подождать, пока не появится элемент для обработки

4 ответа

Вот как написать очередь без блокировки в C++:

http://www.ddj.com/hpc-high-performance-computing/210604448

Но когда вы говорите "поток А не должен блокировать", вы уверены, что это требование? Windows не является операционной системой реального времени (как и Linux при обычном использовании). Если вы хотите, чтобы поток А мог использовать всю доступную системную память, он должен выделить память (или подождать, пока кто-то другой это сделает). Сама ОС не может обеспечить гарантии синхронизации лучше, чем те, которые вы имели бы, если бы и читатель, и писатель взяли на себя блокировку в процессе (т. Е. Неразделенный мьютекс) для манипулирования списком. И худший случай добавления сообщения - пойти в ОС, чтобы получить память.

Короче говоря, есть причина, по которой те очереди, которые вам не нравятся, имеют фиксированную емкость - они не должны выделять память в предположительно низко-латентном потоке.

Таким образом, код без блокировок, как правило, будет менее блочным, но из-за выделения памяти это не гарантируется, и производительность с мьютексом не должна быть такой уж плохой, если у вас нет действительно огромного потока событий для процесс (например, вы пишете сетевой драйвер и сообщения являются входящими пакетами Ethernet).

Итак, в псевдокоде первое, что я попробую, будет:

Writer:
    allocate message and fill it in
    acquire lock
        append node to intrusive list
        signal condition variable
    release lock

Reader:
    for(;;)
        acquire lock
            for(;;)
                if there's a node
                    remove it
                    break
                else
                   wait on condition variable
                endif
            endfor
        release lock
        process message
        free message
    endfor

Только если это приведет к недопустимым задержкам в потоке писателя, я перейду к коду без блокировки (если только у меня не оказалось подходящей очереди).

  • Почему бы не использовать STL < list> или < deque> с мьютексом вокруг добавить / удалить? Потокобезопасность STL недостаточна?

  • Почему бы не создать свой собственный (поодиночке / дважды) класс связанного узла-списка, который содержит указатель, и чтобы элементы, которые должны быть добавлены / удалены, наследовали от этого? Таким образом, делая дополнительное распределение ненужным. Вы просто пометите несколько указателей в threadA::add() а также threadB::remove() и вы сделали. (Хотя вы захотите сделать это под мьютексом, эффект блокировки на threadA будет незначительным, если вы не сделаете что-то действительно неправильное...)

  • Если вы используете pthreads, проверьте sem_post() а также sem_wait(), Идея состоит в том, что поток B может блокировать бесконечно через sem_wait() пока threadA не поместит что-то в очередь. Затем ThreadA вызывает sem_post(), Который пробуждает threadB, чтобы сделать это работа. После чего поток B может вернуться в режим сна. Это эффективный способ обработки асинхронной сигнализации, поддерживающий такие вещи, как множественные threadA::add()перед threadB::remove() завершается.

В Visual Studio 2010 добавлены 2 новые библиотеки, которые очень хорошо поддерживают этот сценарий: библиотека асинхронных агентов и библиотека параллельных шаблонов.

Библиотека агентов имеет поддержку или асинхронную передачу сообщений и содержит блоки сообщений для отправки сообщений в "цели" и для получения сообщений из "источников"

Unbounded_buffer - это шаблонный класс, который предлагает то, что, я полагаю, вы ищете:

#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace ::Concurrency;
using namespace ::std;

int main()
{
   //to hold our messages, the buffer is unbounded...
   unbounded_buffer<int> buf1;
   task_group tasks;

   //thread 1 sends messages to the unbounded_buffer
   //without blocking
   tasks.run([&buf1](){
      for(int i = 0 ; i < 10000; ++i)
         send(&buf1,i)
     //signal exit 
     send(&buf1,-1);
   });

   //thread 2 receives messages and blocks if there are none

   tasks.run([&buf1](){
      int result;
      while(result = receive(&buf1)!=-1)
      {
           cout << "I got a " << result << endl;
      }
   });

   //wait for the threads to end
   tasks.wait();
}

Возможно, вы захотите рассмотреть ваши требования - действительно ли это так, что А не может вообще отказаться от каких-либо элементов очереди? Или вы не хотите, чтобы B вытаскивал из очереди два последовательных элемента, из которых не поступали последовательные элементы, потому что это каким-то образом искажало бы последовательность событий?

Например, если это какая-то система регистрации данных, вы (по понятным причинам) не захотите пропусков в записи - но без неограниченной памяти реальность такова, что в каком-то угловом случае где-то вы, вероятно, могли бы переполнить свою очередь.,

В этом случае одно из решений состоит в том, чтобы иметь какой-то особый элемент, который можно поместить в очередь, что представляет собой случай, когда А обнаруживает, что ему нужно отбросить предметы. По сути, вы сохраняете один дополнительный элемент, который в большинстве случаев равен нулю. Каждый раз, когда A идет, чтобы добавить элементы в очередь, если этот дополнительный элемент не является нулевым, это входит. Если A обнаруживает, что в очереди нет места, то он настраивает этот дополнительный элемент, чтобы сказать "эй, очередь была заполнена",

Таким образом, A никогда не блокируется, вы можете отбрасывать элементы, когда система очень занята, но вы не упускаете из виду тот факт, что элементы были отброшены, потому что, как только пространство очереди становится доступным, эта отметка указывает, где данные падение произошло. Затем процесс B делает все, что ему нужно, когда он обнаруживает, что он вытянул этот элемент метки переполнения из очереди.

Другие вопросы по тегам