Как правильно выйти из std::thread, который может ожидать переменную std::condition_variable?
У меня есть класс, который реализует многопоточную систему производителя / потребителя, использующую мьютекс и две условные переменные для синхронизации. Производитель сигнализирует потоку потребителя, когда есть элементы для использования, а потребитель сигнализирует потоку производителя, когда он потребляет элементы. Потоки продолжают генерировать и потреблять, пока деструктор не попросит их выйти, установив логическую переменную. Поскольку любой из потоков может ожидать переменную условия, я должен реализовать вторую проверку переменной quit, которая кажется неправильной и грязной...
Я сократил проблему до следующего (работающего на GNU/Linux с g++4.7) примера:
// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>
#include <boost/circular_buffer.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
// Creates a single producer and single consumer thread.
class prosumer
{
public:
// Create the circular buffer and start the producer and consumer thread.
prosumer()
: quit_{ false }
, buffer_{ circular_buffer_capacity }
, producer_{ &prosumer::producer_func, this }
, consumer_{ &prosumer::consumer_func, this }
{}
// Set the quit flag and wait for the threads to exit.
~prosumer()
{
quit_ = true;
producer_.join();
consumer_.join();
}
private:
// Thread entry point for the producer.
void producer_func()
{
// Value to add to the ringbuffer to simulate data.
int counter = 0;
while ( quit_ == false )
{
// Simulate the production of some data.
std::vector< int > produced_items;
const auto items_to_produce = std::rand() % circular_buffer_capacity;
for ( int i = 0; i < items_to_produce; ++i )
{
produced_items.push_back( ++counter );
}
// Get a lock on the circular buffer.
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to be emptied or the quit flag to be set.
buffer_is_empty_.wait( lock, [this]()
{
return buffer_.empty() == true || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the consumer deadlock.
buffer_has_data_.notify_one();
break;
}
// The buffer is locked by this thread. Put the data into it.
buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );
// Notify the consumer that the buffer has some data in it.
buffer_has_data_.notify_one();
}
std::cout << "producer thread quit\n";
}
// Thread entry for the consumer.
void consumer_func()
{
int counter_check = 0;
while ( quit_ == false )
{
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to have some data before trying to read from it.
buffer_has_data_.wait( lock, [this]()
{
return buffer_.empty() == false || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the producer deadlock.
buffer_is_empty_.notify_one();
break;
}
// The buffer is locked by this thread. Simulate consuming the data.
for ( auto i : buffer_ ) assert( i == ++counter_check );
buffer_.clear();
// Notify the producer thread that the buffer is empty.
buffer_is_empty_.notify_one();
}
std::cout << "consumer thread quit\n";
}
// How many items the circular buffer can hold.
static const int circular_buffer_capacity = 64;
// Flag set in the destructor to signal the threads to stop.
std::atomic_bool quit_;
// Circular buffer to hold items and a mutex for synchronization.
std::mutex buffer_lock_;
boost::circular_buffer< int > buffer_;
// Condition variables for the threads to signal each other.
std::condition_variable buffer_has_data_;
std::condition_variable buffer_is_empty_;
std::thread producer_;
std::thread consumer_;
};
int main( int argc, char **argv )
{
(void)argc; (void) argv;
prosumer test;
// Let the prosumer work for a little while.
std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
return EXIT_SUCCESS;
}
Если вы посмотрите на функции потока provider_func и consumer_func, то увидите, что они зацикливаются до тех пор, пока деструктор prosumer не установит переменную quit, но они также проверяют переменную quit снова после блокировки кольцевого буфера. Если переменная quit была установлена, они сигнализируют друг другу, чтобы предотвратить взаимоблокировку.
У меня была еще одна идея - вызвать notify_one() для условных переменных деструктора. Это будет лучшим решением?
Есть лучший способ сделать это?
Обновление 1: я забыл упомянуть, что в этом случае, когда потоки запрашиваются для выхода, потребителю не нужно потреблять оставшиеся данные в циклическом буфере, и это нормально, если производитель также производит немного больше. Пока они оба выйдут и не зайдут в тупик, все будет хорошо.
2 ответа
По моему мнению, вызов notify_one (или, скорее, notify_all, если вы хотите расширить свой буфер для нескольких производителей / потребителей) для обеих условных переменных в деструкторе перед вызовами присоединения будет предпочтительным решением по нескольким причинам:
Во-первых, это соответствует тому, как обычно используются условные переменные: устанавливая quit_, вы изменяете состояние, в котором интересуют потоки производителя / потребителя, и ожидаете их, поэтому вы должны уведомить их об изменении состояния.
Кроме того, notify_one не должна быть очень дорогостоящей операцией.
Кроме того, в более реалистичном приложении может иметь место задержка между производством двух элементов; в этом случае вы можете не захотеть блокировать свой деструктор, пока потребитель не заметит, что он должен отменить, как только следующий элемент будет поставлен в очередь; в примере кода это не происходит, насколько я вижу.
На мой взгляд, есть две функции, которые можно разделить:
- передача и отправка сообщений
- производить и потреблять
Имеет смысл действительно разделять их: поток "работника" не делает ничего, кроме обработки "сообщений", которые могут означать "выход" или "do_work".
Таким образом, вы можете создать универсальный "рабочий" класс, который агрегирует фактическую функцию. produce
а также consume
методы остаются чистыми, а worker
класс заботится только о продолжении работы.