Использование потоков OpenMP и std: :( экспериментально::)simd для вычисления набора Мандельброта
Я хочу реализовать простой плоттер множеств Мандельброта с использованием различных парадигм HPC, показывающий их сильные и слабые стороны и насколько легко или сложно их реализации. Подумайте о GPGPU (CUDA/OpenACC/OpenMP4.5), потоковой передаче / OpenMP и MPI. И используйте эти примеры, чтобы дать программистам, плохо знакомым с высокопроизводительными вычислениями, и увидеть, каковы возможности. Ясность кода важнее, чем получение абсолютной максимальной производительности от оборудования, это второй шаг;)
Поскольку задача распараллеливания тривиальна, а современные процессоры могут добиться огромной производительности с использованием векторных инструкций, я также хочу объединить OpenMP и SIMD. К сожалению, просто добавив#pragma omp simd
не дает удовлетворительных результатов, а использование встроенных функций не очень удобно для пользователя и не годится. Или хорошенько.
К счастью, ведется работа над стандартом C++, так что должно быть проще реализовать общие векторные инструкции, как указано в TS: "Расширения для параллелизма, версия 2", в частности, в разделе 9 о типах, параллельных данным. WIP реализации можно найти здесь, который основан на VC, который можно найти здесь.
Предположим, у меня есть следующий класс (который был изменен, чтобы сделать его немного проще)
#include <stddef.h>
using Range = std::pair<double, double>;
using Resolution = std::pair<std::size_t, std::size_t>;
class Mandelbrot
{
double* d_iters;
Range d_xrange;
Range d_yrange;
Resolution d_res;
std::size_t d_maxIter;
public:
Mandelbrot(Range xrange, Range yrange, Resolution res, std::size_t maxIter);
~Mandelbrot();
void writeImage(std::string const& fileName);
void computeMandelbrot();
private:
void calculateColors();
};
И следующая реализация computeMandelbrot()
используя OpenMP
void Mandelbrot::computeMandelbrot()
{
double dx = (d_xrange.second - d_xrange.first) / d_res.first;
double dy = (d_yrange.second - d_yrange.first) / d_res.second;
#pragma omp parallel for schedule(dynamic)
for (std::size_t row = 0; row != d_res.second; ++row)
{
double c_imag = d_yrange.first + row * dy;
for (std::size_t col = 0; col != d_res.first; ++col)
{
double real = 0.0;
double imag = 0.0;
double realSquared = 0.0;
double imagSquared = 0.0;
double c_real = d_xrange.first + col * dx;
std::size_t iter = 0;
while (iter < d_maxIter && realSquared + imagSquared < 4.0)
{
realSquared = real * real;
imagSquared = imag * imag;
imag = 2 * real * imag + c_imag;
real = realSquared - imagSquared + c_real;
++iter;
}
d_iters[row * d_res.first + col] = iter;
}
}
}
Мы можем предположить, что разрешения для обоих направлений x и y кратны 2/4/8/.., в зависимости от того, какие инструкции SIMD мы используем.
К сожалению, в Интернете очень мало информации о std::experimental::simd
. Никаких нетривиальных примеров, насколько я смог найти.
В репозитории Vc git есть реализация калькулятора множеств Мандельброта, но она довольно запутанная и из-за отсутствия комментариев довольно трудна для понимания.
Понятно, что мне нужно изменить типы данных дублеров в функции computeMandelbrot()
, но я не уверен в чем. TS упоминает два основных новых типа данных для некоторого типа T,
native_simd = std::experimental::simd<T, std::experimental::simd_abi::native>;
а также
fixed_size_simd = std::experimental::simd<T, std::experimental::simd_abi::fixed_size<N>>;
С помощью native_simd
имеет смысл, так как я не знаю своих границ во время компиляции. Но тогда мне непонятно, что представляют собой эти типы,native_simd<double>
одиночный двойной или это набор двойников, на которых выполняется векторная инструкция? А сколько тогда дублей в этой коллекции?
Если бы кто-нибудь мог указать мне на примеры, в которых используются эти концепции, или дать мне несколько указаний о том, как реализовать векторные инструкции с помощью std::experimental::simd, я был бы очень благодарен.
1 ответ
Вот очень простая реализация, которая работает (насколько я могу судить). Проверка того, какие элементы вектора имеют абсолютное значение больше 2, выполняется очень громоздко и неэффективно. Должен быть способ сделать это лучше, но я его еще не нашел.
Я получаю увеличение производительности примерно на 72% на AMD Ryzen 5 3600 и предоставляю g++ возможность -march=znver2
, что меньше ожидаемого.
template <class T>
void mandelbrot(T xstart, T xend,
T ystart, T yend)
{
namespace stdx = std::experimental;
constexpr auto simdSize = stdx::native_simd<T>().size();
constexpr unsigned size = 4096;
constexpr unsigned maxIter = 250;
assert(size % simdSize == 0);
unsigned* res = new unsigned[size * size];
T dx = (xend - xstart) / size;
T dy = (yend - ystart) / size;
for (std::size_t row = 0; row != size; ++row)
{
T c_imag = ystart + row * dy;
for (std::size_t col = 0; col != size; col += simdSize)
{
stdx::native_simd<T> real{0};
stdx::native_simd<T> imag{0};
stdx::native_simd<T> realSquared{0};
stdx::native_simd<T> imagSquared{0};
stdx::fixed_size_simd<unsigned, simdSize> iters{0};
stdx::native_simd<T> c_real;
for (int idx = 0; idx != simdSize; ++idx)
{
c_real[idx] = xstart + (col + idx) * dx;
}
for (unsigned iter = 0; iter != maxIter; ++iter)
{
realSquared = real * real;
imagSquared = imag * imag;
auto isInside = realSquared + imagSquared > stdx::native_simd<T>{4};
for (int idx = 0; idx != simdSize; ++idx)
{
// if not bigger than 4, increase iters
if (!isInside[idx])
{
iters[idx] += 1;
}
else
{
// prevent that they become inf/nan
real[idx] = static_cast<T>(4);
imag[idx] = static_cast<T>(4);
}
}
if (stdx::all_of(isInside) )
{
break;
}
imag = static_cast<T>(2.0) * real * imag + c_imag;
real = realSquared - imagSquared + c_real;
}
iters.copy_to(res + row * size + col, stdx::element_aligned);
}
}
delete[] res;
}
Весь тестовый код (начиная с auto test = (...)
) компилируется до
.L9:
vmulps ymm1, ymm1, ymm1
vmulps ymm13, ymm2, ymm2
xor eax, eax
vaddps ymm2, ymm13, ymm1
vcmpltps ymm2, ymm5, ymm2
vmovaps YMMWORD PTR [rsp+160], ymm2
jmp .L6
.L3:
vmovss DWORD PTR [rsp+32+rax], xmm0
vmovss DWORD PTR [rsp+64+rax], xmm0
add rax, 4
cmp rax, 32
je .L22
.L6:
vucomiss xmm3, DWORD PTR [rsp+160+rax]
jp .L3
jne .L3
inc DWORD PTR [rsp+96+rax]
add rax, 4
cmp rax, 32
jne .L6