Реализация condition_variable для решения многопоточного режима ожидания

Моя программа печатает несколько строк текста на консоли с помощью свободных рабочих потоков. Проблема, однако, заключается в том, что рабочие не ждут, пока предыдущие рабочие закончат работу, прежде чем печатать текст, в результате чего текст будет вставлен в текст другого рабочего потока, как показано на рисунке ниже:

Мне нужно исправить эту проблему, известную как проблема занятого ожидания, с помощью std::condition_variable. Я попытался реализовать условную переменную в приведенном ниже коде, основываясь на примере, найденном по этой ссылке, и следующий вопрос stackru помог мне, но не достаточно, из-за моего ограниченного знания C++ в целом. Так что, в конце концов, я только прокомментировал все назад, и теперь я в растерянности.

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>

class ThreadPool; // forward declare
//std::condition_variable cv;
//bool ready = false;
//bool processed = false;

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        //cv.wait(locker, [] {return ready; });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            //cv.notify_one();
            //processed = true;
            task();
        }
        else {
            locker.unlock();
            //cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    //cv.wait(lock, [] { return processed; });
    tasks.push_back(std::function<void()>(f));
    //ready = true;
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}

2 ответа

Решение

Вот рабочий образец:

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>
#include <atomic>

class ThreadPool; 

// forward declare
std::condition_variable ready_cv;
std::condition_variable processed_cv;
std::atomic<bool> ready(false);
std::atomic<bool> processed(false);

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;

    // in real life you need a variable here like while(!quitProgram) or your
    // program will never return. Similarly, in real life always use `wait_for`
    // instead of `wait` so that periodically you check to see if you should
    // exit the program
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        ready_cv.wait(locker, [] {return ready.load(); });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            task();
            processed = true;
            processed_cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.push_back(std::function<void()>(f));
    processed = false;
    ready = true;
    ready_cv.notify_one();
    processed_cv.wait(lock, [] { return processed.load(); });
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}

Выход:

Text printed by worker 0 
Text printed by worker 1 
Text printed by worker 2 
Text printed by worker 3 
Text printed by worker 4 
Text printed by worker 5 
Text printed by worker 6 
Text printed by worker 7

Почему бы не сделать это в рабочем коде

Поскольку задание состоит в том, чтобы печатать строки по порядку, этот код на самом деле не может быть распараллелен, и поэтому мы придумали способ заставить его работать полностью последовательно, используя требуемый Золотой молоток std::condition_variable, Но, по крайней мере, мы избавились от этого чертовски занятого ожидания!

В реальном примере вы захотите обрабатывать данные или выполнять задачи параллельно и синхронизировать только вывод, и эта структура все еще не является правильным подходом, если бы вы делали это с нуля.

Что я изменил и почему

Я использовал атомарные bools для условий, потому что они имеют детерминированное поведение при совместном использовании несколькими потоками. Не обязательно во всех случаях, но, тем не менее, это хорошая практика.

Вы должны включить условие выхода в while(true) цикл (например, флаг, который устанавливается SIGINT обработчик или что-то), или ваша программа никогда не выйдет. Это просто задание, поэтому мы его пропустили, но это очень важно, чтобы не пренебрегать в производственном коде.

Может быть, назначение можно было бы решить с помощью одной условной переменной, но я не уверен в этом, и в любом случае лучше использовать две, потому что это намного более понятно и понятно для каждого. В основном, мы ждем задачу, затем просим, ​​чтобы enqueuer подождал, пока она не была выполнена, затем скажите, что она действительно обработана, мы готовы к следующей. Сначала вы были на правильном пути, но я думаю, что с двумя резюме стало более очевидно, что происходит не так.

Кроме того, важно установить условие Vars (ready а также processed) перед использованием notify(),

я удалил locker.unlock() потому что дело не нужно. Блокировки C++ std являются структурами RAII, и поэтому блокировка будет разблокирована, когда она выйдет из области видимости, что по сути является следующей строкой. В общем, лучше избегать бессмысленных ветвлений, поскольку ваша программа излишне полна состояния.

Педагогическая напыщенная речь...

Теперь, когда эта проблема была решена и решена, у меня есть несколько вещей, которые, я думаю, нужно сказать о задании в целом, и которые, я думаю, вероятно, будут важнее для вашего обучения, чем решение проблемы, как указано.

Если вы были смущены или разочарованы этим заданием, тогда хорошо, вы должны быть. Имеет смысл, что вам трудно вставить квадратный колышек в круглое отверстие, и я думаю, что реальная ценность этой проблемы - научиться определять, когда вы используете правильный инструмент для правильной работы, а когда нет.,

Условные переменные являются правильным инструментом для решения проблемы занятого цикла, однако это назначение (как указано @nm) является простым условием гонки. Тем не менее, это всего лишь простое условие гонки, поскольку оно включает в себя ненужный и плохо реализованный пул потоков, что делает проблему сложной и трудной для понимания без какой-либо цели. И это сказал, std::async в любом случае следует отдавать предпочтение перед ручным пулом потоков в современном C++ (это проще для правильной реализации и более производительно на многих платформах, и для этого не требуется куча глобальных и синглтонов и исключительно выделенных ресурсов).

Если бы это было задание от вашего босса, а не от профессора, вот что вы бы сделали:

for(int i = 0; i < 8; ++i)
{
    std::cout << "Text printed by worker " << i << std::endl;
}

Эта проблема решается (оптимально) простым for петля. Проблемы с занятым ожиданием / блокировкой являются результатом ужасного дизайна, и "правильное" решение - исправить конструкцию, а не перевязать ее. Я даже не думаю, что это задание поучительно, потому что нет никакой возможности или причины распараллелить вывод, так что это просто приводит в замешательство всех, включая сообщество SO. Похоже на негативное обучение, что потоки просто вносят ненужную сложность, не улучшая вычисления.

Трудно сказать, действительно ли профессор хорошо понимает концепции потоков и переменных условий из структуры задания. Задания по необходимости должны быть сведены к минимуму, упрощены и несколько упрощены для целей обучения, но на самом деле это противоположно тому, что было сделано здесь, где сложная проблема была сделана из простой.

Как правило, я никогда не отвечаю на вопросы, связанные с домашним заданием по SO, потому что я думаю, что раздавать ответы мешает обучению, и что наиболее ценный навык разработчика - это научиться биться головой о стену, пока идея не натолкнется на нее. Тем не менее, нет ничего, кроме отрицательного обучения, которое можно получить от надуманных заданий, подобных этому, и хотя в школе вы должны играть по правилам профессора, важно научиться распознавать надуманные проблемы, когда вы их видите, разбирать их и приходить к простое и правильное решение.

Я думаю, что это нормально, так как мьютекс не заблокирован перед печатью. Для каждого поворота в цикле нет гарантии, что я буду напечатан раньше, чем i+1.

Для хорошего приоритета печати вы должны отображать сообщения после блокировки мьютекса в очереди функций.

Другие вопросы по тегам