Как запустить несколько потоков, и каждый поток работает с разными файлами?

У меня есть однопоточное приложение, которое отправляет файл на другой сервер, вызывая send_new_file

void send_new_file_command::start_sending_file()
{
    m_thread = thread(&send_new_file_command::execute_file, this);
}

void send_new_file_command::execute_file()
{
    for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
    {
        {
            std::unique_lock<spinning_lock> guard(lock_obj);
            m_current_file = *it;
        }
        // send a file.
        // I want to call this in parallel
        send_new_file(*it);
    }
}

Есть ли способ у меня может быть несколько потоков, и каждый поток посылает по одному файлу каждый. В качестве примера, скажем, у нас есть 4 потока, а потоки 1,2,3,4 будут отправлять разные файлы параллельно. Я хочу позвонить send_new_file в параллели?

я использую std::thread, Я смотрел на пример потока о том, как я могу сделать это в C++, но запутался, как я могу разделить количество файлов на поток здесь и убедиться, что каждый поток работает с подмножеством файлов.

  std::vector<std::thread> threads;
  for (int i = 0; i < 4; ++i)
    threads.push_back(std::thread(send_new_file(*it)));

Мой опыт работы в Java немного сбивает с толку, как это сделать в C++, используя std::thread.

3 ответа

Решение

Это довольно простой подход с использованием рабочей очереди. Вы можете объединить фрагменты кода в отдельную программу. Мы будем использовать следующие стандартные библиотечные заголовки.

#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

Сначала мы определяем функцию, которая принимает одно имя файла и отправляет его туда, куда оно должно идти. Я смоделирую это, просто написав это /dev/null,

void
send_file(const std::string& filename)
{
  std::ifstream istr {};
  std::ofstream ostr {};
  std::string line {};
  istr.exceptions(std::ifstream::badbit);
  ostr.exceptions(std::ofstream::badbit);
  istr.open(filename);
  ostr.open("/dev/null");
  while (std::getline(istr, line))
    ostr << line << '\n';
}

Далее мы определяем функцию, которая принимает указатель на std::vector файлов, которые еще нужно отправить, и еще один указатель на std::mutex это должно защищать этот вектор. Я использую указатели вместо ссылок, потому что это позволяет мне создавать std::threadпроще позже. Вам не нужно делать это, если вам это не нравится.

int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
  auto count = 0;
  while (true)
    {
      std::string next {};
      {
        const std::unique_lock<std::mutex> lck {*mutex_p};
        if (files_p->empty())  // nothing left to do
          return count;
        next = std::move(files_p->back());
        files_p->pop_back();
      }
      send_file(next);
      count += 1;
    }
}

Важно то, что мы не удерживаем блокировку во время фактической работы по отправке файла. Иначе мы бы полностью убили параллелизм. Я также был осторожен, чтобы не выделять память, удерживая блокировку. Обычно вы увидите std::listиспользуется в качестве рабочих очередей и std::condition_variables, чтобы сигнализировать, когда произошло изменение в очереди. Я разместил код, показывающий это в другом ответе некоторое время назад. Однако в этом простом случае очередь удаляется только из std::vector идеально подходит.

Наконец, мы используем то, что имеем в простой программе, которая создает один поток на аппаратный блок параллелизма и просит эти потоки отправить все файлы, указанные в аргументах командной строки. Обратите внимание, что, как написано, это будет обрабатывать список в обратном порядке. Тем не менее, это легко изменить, если это проблема для вас.

int
main(int argc, char * * argv)
{
  const auto nthreads = std::thread::hardware_concurrency();
  std::mutex mutex {};
  std::vector<std::thread> threads {};
  std::vector<std::string> files {};
  files.reserve(argc - 1);
  for (auto i = 1; i < argc; ++i)
    files.push_back(argv[i]);
  threads.reserve(nthreads);
  for (auto t = 0U; t < nthreads; ++t)
    threads.emplace_back(send_files, &files, &mutex);
  for (auto t = 0U; t < nthreads; ++t)
    threads[t].join();
}

Первый подход

Есть первое простое решение:

  • ваш класс содержит вектор файлов для обработки
  • только один поток управляет этим вектором через функцию execute_file()
  • эта функция создает столько потоков, сколько необходимо, каждый обрабатывает один файл
  • в конце все потоки объединяются (обязательно)

Код будет выглядеть так:

struct send_new_file_command {
    vector<string> files_need_to_send;
public:
    send_new_file_command(vector<string> f) : files_need_to_send(f) {}
    void execute_file();
};
void send_new_file_command::execute_file()
{
    vector<thread> exec;
    for(auto it = files_need_to_send.begin(); it != files_need_to_send.end(); ++it)
    {
        exec.push_back(thread(send_new_file, *it));
    }
    for(auto &e : exec)
        e.join();
}

Код можно протестировать с помощью следующего:

void send_new_file(string x) { // simulator 
    for(int i = 0; i<10; i++) {
        cout << x << endl;
        this_thread::sleep_for(chrono::milliseconds(500));
    }
}
int main() {
    vector<string>vs{"a", "b", "c", "d"};
    send_new_file_command sfc(vs);
    sfc.execute_file();
    return 0;
}

Это решение очень простое. У него есть два основных недостатка:

  • он может запустить гораздо больше потоков, чем может управлять ваше оборудование. Так что только немногие из них действительно работают одновременно.
  • в теме посвящен файлу. Если это короткий файл и поток снова свободен, он не будет использоваться повторно.

Другие решения

Есть много других решений. Например:

  • Вариант этого - запуск фиксированного числа потоков, каждый из которых просматривает вектор файлов для обработки для следующего элемента, как только он будет готов. Затем вам нужно будет ввести сильную блокировку.

  • Вместо использования необработанных потоков, вы могли бы рассмотреть фьючерсы, запуская std::async(std::launch::async, send_new_file, *it);

С точки зрения производительности лучший способ сделать это:

  1. объявить переменную счетчика, используя std::atomic<int>
  2. создать потоки в векторе, массиве, что угодно
  3. вызовите соединение для каждой темы

Затем основная функция потока получает доступ и увеличивает общий счетчик и сохраняет результат в локальной переменной в цикле:

std::atomic<int> counter = 0;
for(int j = 0;j<4;j++)
{
    threads.push_back(std::thread([&](){
        for(int i; (i = counter++) < size;)//the counter variable must be atomic!
        {
            do_work(i);
        }
    }));
}

for(int j = 0;j<4;j++)
    threads[i].join();
Другие вопросы по тегам