Как искать в следующей доступной теме, чтобы сделать вычисление
Я делаю многопоточность в C++. Это может быть что-то очень стандартное, но я не могу найти это нигде или узнать какие-либо ключевые термины для поиска в Интернете.
Я хочу делать какие-то вычисления много раз, но с несколькими потоками. Для каждой итерации вычислений я хочу найти следующий доступный поток, который завершил свое предыдущее вычисление, чтобы выполнить следующую итерацию. Я не хочу перебирать потоки по порядку, так как следующий вызываемый поток еще не закончил свою работу.
Например, предположим, что у меня есть вектор типа int, и я хочу суммировать итоговую сумму с 5 потоками. У меня есть где-то обновляемая общая сумма, хранящаяся где-то, и количество элементов, для которых я в данный момент занят. Каждый поток просматривает счетчик, чтобы увидеть следующую позицию, а затем берет это векторное значение и добавляет его к общей сумме. Затем он возвращается к поиску счета для следующей итерации. Таким образом, для каждой итерации приращения счетчика затем ищут следующий доступный поток (может быть, тот, который уже ожидает счетчика, или, возможно, они все заняты, все еще работая), чтобы выполнить следующую итерацию. Мы не увеличиваем количество потоков, но я хочу иметь возможность каким-то образом искать во всех 5 потоках первый, который завершает следующие вычисления.
Как бы я пошел о кодировании этого. Все, что я знаю, включает в себя выполнение цикла через потоки, так что я не могу проверить следующий доступный, который может быть не в порядке.
2 ответа
Используйте semafor (или mutex, всегда смешивайте эти два) в глобальной переменной, сообщающей вам, что будет дальше. Semafor будет блокировать другие потоки, пока вы обращаетесь к переменной, делая эти потоки доступными.
Итак, если у вас есть массив элементов X. И глобал с именем nextfree witch инициализируется в 0, тогда псевдокод будет выглядеть так:
while (1)
{
<lock semafor INT>
if (nextfree>=X)
{
<release semnafor INT>
<exit and terminate thread>
}
<Get the data based on "nextfree">
nextfree++;
<release semafor INT>
<do your stuff withe the chunk you got>
}
Дело в том, что каждый поток будет один и будет иметь эксклюзивный доступ к структуре данных в пределах блокировки semafor и, следовательно, может получить доступ к следующему доступному независимо от того, что делают другие. (Другие потоки должны будут ждать в очереди, если они закончили, в то время как другой поток работает над получением следующего блока данных. Когда вы освобождаете только ОДИН, который стоит в очереди, будет доступ. Остальным придется ждать.)
Есть некоторые вещи, которые нужно остерегаться. Semafor может заблокировать вашу систему, если вам удастся выйти в неправильном положении (без освобождения) или создать тупик.
Это пул потоков:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back( T t ) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); } );
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
struct thread_pool {
thread_pool( std::size_t n = 1 ) { start_thread(n); }
thread_pool( thread_pool&& ) = delete;
thread_pool& operator=( thread_pool&& ) = delete;
~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
template<class F, class R=std::result_of_t<F&()>>
std::future<R> queue_task( F task ) {
std::packaged_task<R()> p(std::move(task));
auto r = p.get_future();
tasks.push_back( std::move(p) );
return r;
}
template<class F, class R=std::result_of_t<F&()>>
std::future<R> run_task( F task ) {
if (threads_active() >= total_threads()) {
start_thread();
}
return queue_task( std::move(task) );
}
void terminate() {
tasks.terminate();
}
std::size_t threads_active() const {
return active;
}
std::size_t total_threads() const {
return threads.size();
}
void clear_threads() {
terminate();
threads.clear();
}
void start_thread( std::size_t n = 1 ) {
while(n-->0) {
threads.push_back(
std::async( std::launch::async,
[this]{
while(auto task = tasks.pop_front()) {
++active;
try{
(*task)();
} catch(...) {
--active;
throw;
}
--active;
}
}
)
);
}
}
private:
std::vector<std::future<void>> threads;
threaded_queue<std::packaged_task<void()>> tasks;
std::atomic<std::size_t> active;
};
Вы даете сколько ниток либо на стройке, либо через start_thread
,
Ты тогда queue_task
, Это возвращает std::future
это говорит вам, когда задача выполнена.
Когда потоки завершают задачу, они переходят к threaded_queue
и искать больше.
Когда threaded_queue
уничтожен, он отменяет все данные в нем.
Когда thread_pool
уничтожается, отменяет все будущие задачи, а затем ожидает завершения всех нерешенных задач.