Параллельно for_each более чем в два раза медленнее, чем std::for_each
Я читаю C++ Concurrency in Action Энтони Уильямса. В главе о проектировании параллельного кода есть параллельная версия std::for_each algorihtm. Вот немного измененный код из книги:
join_thread.hpp
#pragma once
#include <vector>
#include <thread>
class join_threads
{
public:
explicit join_threads(std::vector<std::thread>& threads)
: threads_(threads) {}
~join_threads()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
if(threads_[i].joinable())
{
threads_[i].join();
}
}
}
private:
std::vector<std::thread>& threads_;
};
parallel_for_each.hpp
#pragma once
#include <future>
#include <algorithm>
#include "join_threads.hpp"
template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func func)
{
const auto length = std::distance(first, last);
if (0 == length) return;
const auto min_per_thread = 25u;
const unsigned max_threads = (length + min_per_thread - 1) / min_per_thread;
const auto hardware_threads = std::thread::hardware_concurrency();
const auto num_threads= std::min(hardware_threads != 0 ?
hardware_threads : 2u, max_threads);
const auto block_size = length / num_threads;
std::vector<std::future<void>> futures(num_threads - 1);
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);
auto block_start = first;
for (unsigned i = 0; i < num_threads - 1; ++i)
{
auto block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void (void)> task([block_start, block_end, func]()
{
std::for_each(block_start, block_end, func);
});
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, func);
for (size_t i = 0; i < num_threads - 1; ++i)
{
futures[i].get();
}
}
Я сравнил его с последовательной версией std::for_each, используя следующую программу:
main.cpp
#include <iostream>
#include <random>
#include <chrono>
#include "parallel_for_each.hpp"
using namespace std;
constexpr size_t ARRAY_SIZE = 500'000'000;
typedef std::vector<uint64_t> Array;
template <class FE, class F>
void test_for_each(const Array& a, FE fe, F f, atomic<uint64_t>& result)
{
auto time_begin = chrono::high_resolution_clock::now();
result = 0;
fe(a.begin(), a.end(), f);
auto time_end = chrono::high_resolution_clock::now();
cout << "Result = " << result << endl;
cout << "Time: " << chrono::duration_cast<chrono::milliseconds>(
time_end - time_begin).count() << endl;
}
int main()
{
random_device device;
default_random_engine engine(device());
uniform_int_distribution<uint8_t> distribution(0, 255);
Array a;
a.reserve(ARRAY_SIZE);
cout << "Generating array ... " << endl;
for (size_t i = 0; i < ARRAY_SIZE; ++i)
a.push_back(distribution(engine));
atomic<uint64_t> result;
auto acc = [&result](uint64_t value) { result += value; };
cout << "parallel_for_each ..." << endl;
test_for_each(a, parallel_for_each<Array::const_iterator, decltype(acc)>, acc, result);
cout << "for_each ..." << endl;
test_for_each(a, for_each<Array::const_iterator, decltype(acc)>, acc, result);
return 0;
}
Параллельная версия алгоритма на моей машине более чем в два раза медленнее, чем последовательная:
parallel_for_each ...
Result = 63750301073
Time: 5448
for_each ...
Result = 63750301073
Time: 2496
Я использую компилятор GCC 6.2 в Ubuntu Linux, работающем на процессоре Intel® Core iM- i61-6100 с тактовой частотой 3,70 ГГц.
Как такое поведение можно объяснить? Это из-за обмена atomic<uint64_t>
переменная между потоками и кеш пинг-понга?
Я профилировал оба отдельно с перфом. Для параллельной версии статистика следующая:
1137982167 cache-references
247652893 cache-misses # 21,762 % of all cache refs
60868183996 cycles
27409239189 instructions # 0,45 insns per cycle
3287117194 branches
80895 faults
4 migrations
И для очередного:
402791485 cache-references
246561299 cache-misses # 61,213 % of all cache refs
40284812779 cycles
26515783790 instructions # 0,66 insns per cycle
3188784664 branches
48179 faults
3 migrations
Очевидно, что параллельная версия генерирует гораздо больше ссылок на кэш, циклов и ошибок, но почему?
1 ответ
Вы делитесь тем же result
переменная: все потоки накапливаются на atomic<uint64_t> result
молотить кеш!
Каждый раз, когда поток пишет result
все кэши в других ядрах становятся недействительными: это приводит к конфликту строк кэша.
Дополнительная информация:
"Разделение является корнем всех споров".
[...] для записи в ячейку памяти ядро должно дополнительно иметь исключительное право собственности на строку кэша, содержащую эту ячейку. В то время как одно ядро используется исключительно, все остальные ядра, пытающиеся записать одну и ту же область памяти, должны подождать и по очереди - то есть они должны работать последовательно. Концептуально это выглядит так, как если бы каждая строка кэша была защищена аппаратным мьютексом, где только одно ядро может одновременно удерживать аппаратную блокировку этой строки кэша.
Эта статья о "ложном обмене", в которой описана похожая проблема, более подробно объясняет, что происходит в кэшах.
Я внес некоторые изменения в вашу программу и достиг следующих результатов (на машине с i7-4770K [8 потоков + гиперпоточность]):
Generating array ...
parallel_for_each ...
Result = 63748111806
Time: 195
for_each ...
Result = 63748111806
Time: 2727
Параллельная версия примерно на 92% быстрее, чем последовательная версия.
std::future
а такжеstd::packaged_task
являются тяжеловесными абстракциями. В этом случаеstd::experimental::latch
достаточно.Каждая задача отправляется в пул потоков Это минимизирует накладные расходы на создание потока.
Каждое задание имеет свой собственный аккумулятор. Это исключает обмен.
Код доступен здесь на моем GitHub. Он использует некоторые личные зависимости, но вы должны понимать изменения независимо.
Вот самые важные изменения:
// A latch is being used instead of a vector of futures.
ecst::latch l(num_threads - 1);
l.execute_and_wait_until_zero([&]
{
auto block_start = first;
for (unsigned i = 0; i < num_threads - 1; ++i)
{
auto block_end = block_start;
std::advance(block_end, block_size);
// `p` is a thread pool.
// Every task posted in the thread pool has its own `tempacc` accumulator.
p.post([&, block_start, block_end, tempacc = 0ull]() mutable
{
// The task accumulator is filled up...
std::for_each(block_start, block_end, [&tempacc](auto x){ tempacc += x; });
// ...and then the atomic variable is incremented ONCE.
func(tempacc);
l.decrement_and_notify_all();
});
block_start = block_end;
}
// Same idea here: accumulate to local non-atomic counter, then
// add the partial result to the atomic counter ONCE.
auto tempacc2 = 0ull;
std::for_each(block_start, last, [&tempacc2](auto x){ tempacc2 += x; });
func(tempacc2);
});