Работа с условиями гонки Boost Threads в C++

У меня в приложении постоянно запущено 6 потоков. Сценарий таков:

Один поток непрерывно получает сообщения и вставляет их в очередь сообщений. Другие 4 потока могут рассматриваться как рабочие, которые постоянно извлекают сообщения из очереди и обрабатывают их. Другой последний поток заполняет аналитическую информацию.

Проблема:

Теперь длительность сна для получения потока сообщений составляет 100 мс. Рабочая нить составляет 200 мс. Когда я запустил это приложение, поток извлечения сообщений взял на себя управление и вставил в очередь, таким образом увеличивая кучу. Рабочие потоки не получают возможности обрабатывать сообщения и освобождать их. Наконец это приводит к нехватке памяти.

Как управлять сценарием такого рода, чтобы обеспечить одинаковую возможность для потока извлечения сообщений и рабочего потока.

Заранее спасибо:)

4 ответа

Решение

Ваш вопрос немного расплывчат, поэтому я могу дать вам следующие рекомендации вместо кода:

  1. Защитите взаимные данные с Mutex. В проблеме многопоточного производителя - потребителя обычно существует условие гонки на взаимных данных (сообщение в вашей программе). Один поток пытается выполнить запись в ячейку взаимной памяти, а другой - в том же месте. Сообщение, прочитанное читателем, может быть повреждено, потому что автор написал его в середине процесса чтения. Вы можете заблокировать место взаимной памяти с помощью Mutex. Каждый из потоков должен получить эту блокировку, чтобы иметь возможность читать или изменять взаимные данные. Таким образом, потребительский процесс будет абсолютно уверен, что данные не были изменены. Однако вы должны заметить, что получение этой блокировки может сдерживать поток производителя, поэтому вы должны снять блокировку как можно скорее.
  2. Используйте условные переменные для уведомления потоков потребителей. Если вы не используете механизмы уведомлений, все потребительские потоки должны активно проверять создание данных, которые будут использовать системные ресурсы. Потоки-потребители должны легко перейти в спящий режим, зная, что поток-производитель уведомит их, когда сообщение будет готово.

Библиотека потоков в C++ 11 имеет все необходимое для реализации приложения-производителя. Однако, если вы не можете обновить ваш компилятор, вы также можете использовать библиотеку Boost Threading.

Вам нужно добавить противодавление в поток вашего производителя. Обычно это делается с помощью блокировки очередей потребитель-производитель. Производитель добавляет элементы в очередь, потребители вытесняют элементы из очереди и обрабатывают их. Если очередь пуста, потребители блокируются, пока производитель не добавит что-либо в очередь. Если очередь заполнена, производитель блокируется, пока потребители не получат элементы из очереди.

Одна из систем управления потоком, которую я часто использую, заключается в создании большого пула объектов сообщений при запуске и никогда больше не создавать. Объекты * хранятся в поточно-ориентированном, блокирующем "очередь пула" и распространяются вокруг, извлекаются из пула производителем / производителями, помещаются в очередь к потребителю / в других блокирующих очередях, а затем возвращаются в очередь пула при "потреблении",

Это ограничивает использование памяти, обеспечивает управление потоком (если пул очищается, производитель / блоки блокируют его до тех пор, пока сообщения не будут возвращены потребителями) и исключает постоянное появление новых /delete/malloc/free. Более сложные и более медленные ограниченные очереди не нужны, и все очереди должны быть достаточно большими, чтобы содержать (известное) максимальное количество сообщений.

Использование "классических" очередей блокировки не требует никаких вызовов Sleep().

Вы хотите использовать ограниченную очередь, которая при заполнении блокирует потоки, пытающиеся поставить в очередь, пока не будет доступно больше места.

Вы можете использовать concurrent_bounded_queue из tbb или просто использовать семафор, инициализированный до максимального размера очереди, и уменьшать при постановке в очередь и увеличивать при очереди. boost::thread не предоставляет семафоров изначально, но вы можете реализовать его, используя блокировки и условные переменные.

Другие вопросы по тегам