Как запустить несколько потоков, и каждый поток работает с разными файлами?
У меня есть однопоточное приложение, которое отправляет файл на другой сервер, вызывая send_new_file
void send_new_file_command::start_sending_file()
{
m_thread = thread(&send_new_file_command::execute_file, this);
}
void send_new_file_command::execute_file()
{
for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
{
{
std::unique_lock<spinning_lock> guard(lock_obj);
m_current_file = *it;
}
// send a file.
// I want to call this in parallel
send_new_file(*it);
}
}
Есть ли способ у меня может быть несколько потоков, и каждый поток посылает по одному файлу каждый. В качестве примера, скажем, у нас есть 4 потока, а потоки 1,2,3,4 будут отправлять разные файлы параллельно. Я хочу позвонить send_new_file
в параллели?
я использую std::thread
, Я смотрел на пример потока о том, как я могу сделать это в C++, но запутался, как я могу разделить количество файлов на поток здесь и убедиться, что каждый поток работает с подмножеством файлов.
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.push_back(std::thread(send_new_file(*it)));
Мой опыт работы в Java немного сбивает с толку, как это сделать в C++, используя std::thread.
3 ответа
Это довольно простой подход с использованием рабочей очереди. Вы можете объединить фрагменты кода в отдельную программу. Мы будем использовать следующие стандартные библиотечные заголовки.
#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Сначала мы определяем функцию, которая принимает одно имя файла и отправляет его туда, куда оно должно идти. Я смоделирую это, просто написав это /dev/null
,
void
send_file(const std::string& filename)
{
std::ifstream istr {};
std::ofstream ostr {};
std::string line {};
istr.exceptions(std::ifstream::badbit);
ostr.exceptions(std::ofstream::badbit);
istr.open(filename);
ostr.open("/dev/null");
while (std::getline(istr, line))
ostr << line << '\n';
}
Далее мы определяем функцию, которая принимает указатель на std::vector
файлов, которые еще нужно отправить, и еще один указатель на std::mutex
это должно защищать этот вектор. Я использую указатели вместо ссылок, потому что это позволяет мне создавать std::thread
проще позже. Вам не нужно делать это, если вам это не нравится.
int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
auto count = 0;
while (true)
{
std::string next {};
{
const std::unique_lock<std::mutex> lck {*mutex_p};
if (files_p->empty()) // nothing left to do
return count;
next = std::move(files_p->back());
files_p->pop_back();
}
send_file(next);
count += 1;
}
}
Важно то, что мы не удерживаем блокировку во время фактической работы по отправке файла. Иначе мы бы полностью убили параллелизм. Я также был осторожен, чтобы не выделять память, удерживая блокировку. Обычно вы увидите std::list
используется в качестве рабочих очередей и std::condition_variable
s, чтобы сигнализировать, когда произошло изменение в очереди. Я разместил код, показывающий это в другом ответе некоторое время назад. Однако в этом простом случае очередь удаляется только из std::vector
идеально подходит.
Наконец, мы используем то, что имеем в простой программе, которая создает один поток на аппаратный блок параллелизма и просит эти потоки отправить все файлы, указанные в аргументах командной строки. Обратите внимание, что, как написано, это будет обрабатывать список в обратном порядке. Тем не менее, это легко изменить, если это проблема для вас.
int
main(int argc, char * * argv)
{
const auto nthreads = std::thread::hardware_concurrency();
std::mutex mutex {};
std::vector<std::thread> threads {};
std::vector<std::string> files {};
files.reserve(argc - 1);
for (auto i = 1; i < argc; ++i)
files.push_back(argv[i]);
threads.reserve(nthreads);
for (auto t = 0U; t < nthreads; ++t)
threads.emplace_back(send_files, &files, &mutex);
for (auto t = 0U; t < nthreads; ++t)
threads[t].join();
}
Первый подход
Есть первое простое решение:
- ваш класс содержит вектор файлов для обработки
- только один поток управляет этим вектором через функцию
execute_file()
- эта функция создает столько потоков, сколько необходимо, каждый обрабатывает один файл
- в конце все потоки объединяются (обязательно)
Код будет выглядеть так:
struct send_new_file_command {
vector<string> files_need_to_send;
public:
send_new_file_command(vector<string> f) : files_need_to_send(f) {}
void execute_file();
};
void send_new_file_command::execute_file()
{
vector<thread> exec;
for(auto it = files_need_to_send.begin(); it != files_need_to_send.end(); ++it)
{
exec.push_back(thread(send_new_file, *it));
}
for(auto &e : exec)
e.join();
}
Код можно протестировать с помощью следующего:
void send_new_file(string x) { // simulator
for(int i = 0; i<10; i++) {
cout << x << endl;
this_thread::sleep_for(chrono::milliseconds(500));
}
}
int main() {
vector<string>vs{"a", "b", "c", "d"};
send_new_file_command sfc(vs);
sfc.execute_file();
return 0;
}
Это решение очень простое. У него есть два основных недостатка:
- он может запустить гораздо больше потоков, чем может управлять ваше оборудование. Так что только немногие из них действительно работают одновременно.
- в теме посвящен файлу. Если это короткий файл и поток снова свободен, он не будет использоваться повторно.
Другие решения
Есть много других решений. Например:
Вариант этого - запуск фиксированного числа потоков, каждый из которых просматривает вектор файлов для обработки для следующего элемента, как только он будет готов. Затем вам нужно будет ввести сильную блокировку.
Вместо использования необработанных потоков, вы могли бы рассмотреть фьючерсы, запуская
std::async(std::launch::async, send_new_file, *it);
С точки зрения производительности лучший способ сделать это:
- объявить переменную счетчика, используя
std::atomic<int>
- создать потоки в векторе, массиве, что угодно
- вызовите соединение для каждой темы
Затем основная функция потока получает доступ и увеличивает общий счетчик и сохраняет результат в локальной переменной в цикле:
std::atomic<int> counter = 0;
for(int j = 0;j<4;j++)
{
threads.push_back(std::thread([&](){
for(int i; (i = counter++) < size;)//the counter variable must be atomic!
{
do_work(i);
}
}));
}
for(int j = 0;j<4;j++)
threads[i].join();