Параллельно for_each более чем в два раза медленнее, чем std::for_each

Я читаю C++ Concurrency in Action Энтони Уильямса. В главе о проектировании параллельного кода есть параллельная версия std::for_each algorihtm. Вот немного измененный код из книги:

join_thread.hpp

#pragma once

#include <vector>
#include <thread>

class join_threads
{
public:
  explicit join_threads(std::vector<std::thread>& threads)
    : threads_(threads) {}

  ~join_threads()
  {
    for (size_t i = 0; i < threads_.size(); ++i)
    {
      if(threads_[i].joinable())
      {
        threads_[i].join();
      }
    }
  }

private:
  std::vector<std::thread>& threads_;
};

parallel_for_each.hpp

#pragma once

#include <future>
#include <algorithm>

#include "join_threads.hpp"

template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func func)
{
  const auto length = std::distance(first, last);
  if (0 == length) return;

  const auto min_per_thread = 25u;
  const unsigned max_threads = (length + min_per_thread - 1) / min_per_thread;

  const auto hardware_threads = std::thread::hardware_concurrency();

  const auto num_threads= std::min(hardware_threads != 0 ?
        hardware_threads : 2u, max_threads);

  const auto block_size = length / num_threads;

  std::vector<std::future<void>> futures(num_threads - 1);
  std::vector<std::thread> threads(num_threads-1);
  join_threads joiner(threads);

  auto block_start = first;
  for (unsigned i = 0; i < num_threads - 1; ++i)
  {
    auto block_end = block_start;
    std::advance(block_end, block_size);
    std::packaged_task<void (void)> task([block_start, block_end, func]()
    {
      std::for_each(block_start, block_end, func);
    });
    futures[i] = task.get_future();
    threads[i] = std::thread(std::move(task));
    block_start = block_end;
  }

  std::for_each(block_start, last, func);

  for (size_t i = 0; i < num_threads - 1; ++i)
  {
    futures[i].get();
  }
}

Я сравнил его с последовательной версией std::for_each, используя следующую программу:

main.cpp

#include <iostream>
#include <random>
#include <chrono>

#include "parallel_for_each.hpp"

using namespace std;

constexpr size_t ARRAY_SIZE = 500'000'000;
typedef std::vector<uint64_t> Array;

template <class FE, class F>
void test_for_each(const Array& a, FE fe, F f, atomic<uint64_t>& result)
{
  auto time_begin = chrono::high_resolution_clock::now();
  result = 0;
  fe(a.begin(), a.end(), f);
  auto time_end = chrono::high_resolution_clock::now();

  cout << "Result = " << result << endl;
  cout << "Time: " << chrono::duration_cast<chrono::milliseconds>(
            time_end - time_begin).count() << endl;
}

int main()
{
  random_device device;
  default_random_engine engine(device());
  uniform_int_distribution<uint8_t> distribution(0, 255);

  Array a;
  a.reserve(ARRAY_SIZE);

  cout << "Generating array ... " << endl;
  for (size_t i = 0; i < ARRAY_SIZE; ++i)
    a.push_back(distribution(engine));

  atomic<uint64_t> result;
  auto acc = [&result](uint64_t value) { result += value; };

  cout << "parallel_for_each ..." << endl;
  test_for_each(a, parallel_for_each<Array::const_iterator, decltype(acc)>, acc, result);
  cout << "for_each ..." << endl;
  test_for_each(a, for_each<Array::const_iterator, decltype(acc)>, acc, result);

  return 0;
}

Параллельная версия алгоритма на моей машине более чем в два раза медленнее, чем последовательная:

parallel_for_each ...
Result = 63750301073
Time: 5448
for_each ...
Result = 63750301073
Time: 2496

Я использую компилятор GCC 6.2 в Ubuntu Linux, работающем на процессоре Intel® Core iM- i61-6100 с тактовой частотой 3,70 ГГц.

Как такое поведение можно объяснить? Это из-за обмена atomic<uint64_t> переменная между потоками и кеш пинг-понга?

Я профилировал оба отдельно с перфом. Для параллельной версии статистика следующая:

 1137982167      cache-references                                            
  247652893      cache-misses              #   21,762 % of all cache refs    
60868183996      cycles                                                      
27409239189      instructions              #    0,45  insns per cycle        
 3287117194      branches                                                    
      80895      faults                                                      
          4      migrations

И для очередного:

  402791485      cache-references                                            
  246561299      cache-misses              #   61,213 % of all cache refs    
40284812779      cycles                                                      
26515783790      instructions              #    0,66  insns per cycle
 3188784664      branches                                                    
      48179      faults
          3      migrations

Очевидно, что параллельная версия генерирует гораздо больше ссылок на кэш, циклов и ошибок, но почему?

1 ответ

Решение

Вы делитесь тем же result переменная: все потоки накапливаются на atomic<uint64_t> result молотить кеш!

Каждый раз, когда поток пишет result все кэши в других ядрах становятся недействительными: это приводит к конфликту строк кэша.

Дополнительная информация:

  • "Разделение является корнем всех споров".

    [...] для записи в ячейку памяти ядро ​​должно дополнительно иметь исключительное право собственности на строку кэша, содержащую эту ячейку. В то время как одно ядро ​​используется исключительно, все остальные ядра, пытающиеся записать одну и ту же область памяти, должны подождать и по очереди - то есть они должны работать последовательно. Концептуально это выглядит так, как если бы каждая строка кэша была защищена аппаратным мьютексом, где только одно ядро ​​может одновременно удерживать аппаратную блокировку этой строки кэша.

  • Эта статья о "ложном обмене", в которой описана похожая проблема, более подробно объясняет, что происходит в кэшах.


Я внес некоторые изменения в вашу программу и достиг следующих результатов (на машине с i7-4770K [8 потоков + гиперпоточность]):

Generating array ...
parallel_for_each ...
Result = 63748111806
Time: 195
for_each ...
Result = 63748111806
Time: 2727

Параллельная версия примерно на 92% быстрее, чем последовательная версия.


  1. std::future а также std::packaged_task являются тяжеловесными абстракциями. В этом случае std::experimental::latch достаточно.

  2. Каждая задача отправляется в пул потоков Это минимизирует накладные расходы на создание потока.

  3. Каждое задание имеет свой собственный аккумулятор. Это исключает обмен.

Код доступен здесь на моем GitHub. Он использует некоторые личные зависимости, но вы должны понимать изменения независимо.


Вот самые важные изменения:

// A latch is being used instead of a vector of futures.
ecst::latch l(num_threads - 1);

l.execute_and_wait_until_zero([&]
{
    auto block_start = first;
    for (unsigned i = 0; i < num_threads - 1; ++i)
    {
        auto block_end = block_start;
        std::advance(block_end, block_size);

        // `p` is a thread pool.
        // Every task posted in the thread pool has its own `tempacc` accumulator.
        p.post([&, block_start, block_end, tempacc = 0ull]() mutable
        {
            // The task accumulator is filled up...
            std::for_each(block_start, block_end, [&tempacc](auto x){ tempacc += x; });

            // ...and then the atomic variable is incremented ONCE.
            func(tempacc);
            l.decrement_and_notify_all();
        });

        block_start = block_end;
    }

    // Same idea here: accumulate to local non-atomic counter, then
    // add the partial result to the atomic counter ONCE.
    auto tempacc2 = 0ull;
    std::for_each(block_start, last, [&tempacc2](auto x){ tempacc2 += x; });
    func(tempacc2);
});
Другие вопросы по тегам