Как выполнить итерацию по указателям, повышающим поток
У меня есть многопоточное приложение. Каждый поток инициализирует тип данных struct в своем собственном локальном хранилище. Некоторые элементы добавляются к векторам внутри переменных типа структуры. В конце программы я хотел бы перебрать локальные хранилища этих потоков и сложить все результаты вместе. Как я могу перебрать указатель потока, чтобы я мог добавить все результаты из нескольких потоков вместе?
Заранее спасибо.
boost::thread_specific_ptr<testStruct> tss;
size_t x = 10;
void callable(string str, int x) {
if(!tss.get()){
tss.reset(new testStruct);
(*tss).xInt.resize(x, 0);
}
// Assign some values to the vector elements after doing some calculations
}
Пример:
#include <iostream>
#include <vector>
#include <boost/thread/mutex.hpp>
#include <boost/thread/tss.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#define NR_THREAD 4
#define SAMPLE_SIZE 500
using namespace std;
static bool busy = false;
struct testStruct{
vector<int> intVector;
};
boost::asio::io_service ioService;
boost::thread_specific_ptr<testStruct> tsp;
boost::condition_variable cond;
boost::mutex mut;
void callable(int x) {
if(!tsp.get()){
tsp.reset(new testStruct);
}
(*tsp).intVector.push_back(x);
if (x + 1 == SAMPLE_SIZE){
busy = true;
cond.notify_all();
}
}
int main() {
boost::thread_group threads;
size_t (boost::asio::io_service::*run)() = &boost::asio::io_service::run;
boost::asio::io_service::work work(ioService);
for (short int i = 0; i < NR_THREAD; ++i) {
threads.create_thread(boost::bind(run, &ioService));
}
size_t iterations = 10;
for (int i = 0; i < iterations; i++) {
busy = false;
for (short int j = 0; j < SAMPLE_SIZE; ++j) {
ioService.post(boost::bind(callable, j));
}
// all threads need to finish the job for the next iteration
boost::unique_lock<boost::mutex> lock(mut);
while (!busy) {
cond.wait(lock);
}
cout << "Iteration: " << i << endl;
}
vector<int> sum(SAMPLE_SIZE, 0); // sum up all the values from thread local storages
work.~work();
threads.join_all();
return 0;
}
1 ответ
Итак, после того, как я немного подумал над этой проблемой, я пришел к такому решению:
void accumulateTLS(size_t idxThread){
if (idxThread == nr_threads) // Suspend all the threads till all of them are called and waiting here
{
busy = true;
}
boost::unique_lock<boost::mutex> lock(mut);
while (!busy)
{
cond.wait(lock);
}
// Accumulate the variables using thread specific pointer
cond.notify_one();
}
С помощью boost io_service вызываемая функция может быть изменена после инициализации потоков. Итак, после того, как я выполнил все расчеты, я снова отправляю задания (столько же, сколько и потоков) в службу io с вызываемой функцией bugsulateTLS(idxThread). N заданий отправляются в N потоков, и процесс накопления выполняется внутри метода collectulateTLS.
PS вместо work.~ Work(), следует использовать work.reset().