C++: как перенести выравнивание и ограничить атрибуты в функцию доступа к шаблону

Я хочу вычислить общий матричный продукт

C = C + alpha*A*B

Теперь, особая ситуация в моем контексте состоит в том, что я знаю количество столбцов A (соответственно количество строк B) во время компиляции. Поэтому я могу жестко закодировать эту информацию, что, естественно, значительно повышает производительность.

До сих пор я делал это на Фортране, который работает, но требует от меня переписывать один и тот же код снова и снова для всех возможных сценариев. Поэтому я хотел бы написать одну версию C++ этой функции, в которой я передаю информацию о размере во время компиляции.

После небольшой попытки я наконец-то пришел к следующему решению

template<size_t sizeA2>
void matrixMatrixProduct( double* __restrict__ pC,
                          double const* __restrict__ pA,
                          double const* __restrict__ pB,
                          double alpha,
                          size_t sizeA1,
                          size_t sizeB2 )
{

  size_t outerLoopLimit = sizeA1;
  size_t innerLoopLimit = sizeB2;

  size_t sizeC2 = sizeB2;

  for ( size_t i = 0; i < outerLoopLimit; ++i )
  {
#pragma vector aligned
#pragma ivdep
    for ( size_t j = 0; j < innerLoopLimit; ++j )
    {
#pragma vector aligned
#pragma ivdep
      for ( size_t k = 0; k < sizeA2; ++k )
      {
        pC[i * sizeC2 + j] += alpha * pA[i * sizeA2 + k] * pB[k * sizeB2 + j];
      } // end of k-loop
    } // end of j-loop
  } // end of i-loop

}

Добавив ключевое слово restrict и две прагмы, я мог бы убедить компилятор Intel Cpp векторизовать j-цикл так, чтобы я получил максимальную скорость около 19 GFlops на одно ядро ​​Intel(R) Core(TM) i7-4790. Процессор @ 3.60GHz, который полностью соответствует контрагенту Fortran.

Теперь я хочу расширить функциональность, чтобы я мог транспонировать три матрицы A, B и C по отдельности. Для этого я планировал добавить политику доступа к шаблонам, которая при необходимости переворачивает два индекса. Я расширил код следующим образом

template<size_t sizeA2,
         typename AccessOperatorA = NoTranspose,
         typename AccessOperatorB = NoTranspose,
         typename AccessOperatorC = NoTranspose >
void matrixMatrixProduct( double* __restrict__ pC,
                          double const* __restrict__ pA,
                          double const* __restrict__ pB,
                          double alpha,
                          size_t sizeA1,
                          size_t sizeB2 )
{

  size_t outerLoopLimit = sizeA1;
  size_t innerLoopLimit = sizeB2;

  size_t sizeC2 = sizeB2;

  for ( size_t i = 0; i < outerLoopLimit; ++i )
  {
#pragma vector aligned
#pragma ivdep
    for ( size_t j = 0; j < innerLoopLimit; ++j )
    {
#pragma vector aligned
#pragma ivdep
      for ( size_t k = 0; k < sizeA2; ++k )
      {
        AccessOperatorC::get( pC, sizeC2,  i, j ) += alpha * AccessOperatorA::get( pA, sizeA2, i, k ) * AccessOperatorB::get( pB, sizeB2, k, j );
      } // end of k-loop
    } // end of j-loop
  } // end of i-loop

}

с оператором доступа NoTranspose, определяемым как

struct NoTranspose
{
  template<typename DataType>
  static inline DataType& get( DataType* __restrict__ pointer,
                               size_t size2,
                               size_t i,
                               size_t j )
  {
    return pointer[i * size2 + j];
  }
};

В принципе, этот код компилируется и дает правильные ответы, но только на 60% скорости!

Насколько я понимаю, проблемы заключаются в том, что информация о выравнивании не передается в функцию get шаблона доступа, хотя функция get встроена в соответствии с perf. отчет. Однако icpc, похоже, решает работать только с регистрами xmm, что приводит к снижению производительности.

Поэтому мой вопрос: как заставить компилятор правильно проверять расширенный код?

Любая помощь с благодарностью.

Для полноты добавляю полный MWE

#include <stdio.h>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <chrono>
#include <cmath>

struct NoTranspose
{
  template<typename DataType>
  static inline DataType& get( DataType* __restrict__ pointer,
                               size_t size2,
                               size_t i,
                               size_t j )
  {
    return pointer[i * size2 + j];
  }
};

template<size_t sizeA2,
         typename AccessOperatorA = NoTranspose,
         typename AccessOperatorB = NoTranspose,
         typename AccessOperatorC = NoTranspose >
void matrixMatrixProduct( double* __restrict__ pC,
                          double const* __restrict__ pA,
                          double const* __restrict__ pB,
                          double alpha,
                          size_t sizeA1,
                          size_t sizeB2 )
{

  size_t outerLoopLimit = sizeA1;
  size_t innerLoopLimit = sizeB2;

  size_t sizeC2 = sizeB2;

  for ( size_t i = 0; i < outerLoopLimit; ++i )
  {
#pragma vector aligned
#pragma ivdep
    for ( size_t j = 0; j < innerLoopLimit; ++j )
    {
#pragma vector aligned
#pragma ivdep
      for ( size_t k = 0; k < sizeA2; ++k )
      {
        AccessOperatorC::get( pC, sizeC2,  i, j ) += alpha * AccessOperatorA::get( pA, sizeA2, i, k ) * AccessOperatorB::get( pB, sizeB2, k, j );
      } // end of k-loop
    } // end of j-loop
  } // end of i-loop

}

int main( void )
{

  std::vector<int> sizesA1 = { 1, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 30, 40, 50, 60, 70, 80, 90, 100, 125, 150, 175, 200, 250, 300, 350, 400, 500, 600, 700, 800, 900, 1000 };

  printf( "%15s \t %15s \t%15s \t %15s\n", "sizeA1", "sizeA2", "Time [s]", "GFlops" );

  for ( const auto & sizeA1 : sizesA1 )
  {
    int numberOfIterations = 1e6;

    const int sizeA2 = 2;

    size_t sizeB2 = sizeA1;

    size_t sizeC1 = sizeA1;
    size_t sizeC2 = sizeB2;

    size_t lengthA = sizeA1 * sizeA2;
    size_t lengthB = sizeA2 * sizeB2;
    int lengthC = sizeC1 * sizeC2;

    std::vector<double> A1( lengthA, 1.234 );
    std::vector<double> B1( lengthB, 1.234 );
    std::vector<double> C1( lengthC, 1.234 );
    std::vector<double> A2( lengthA, 1.234 );
    std::vector<double> B2( lengthB, 1.234 );
    std::vector<double> C2( lengthC, 1.234 );

    double alpha = 1.234;

    auto start = std::chrono::high_resolution_clock::now( );
    for ( int i = 0; i < numberOfIterations; ++i )
    {
      double* pA;
      double* pB;
      double* pC;

      //Force cache reload
      if ( i % 2 )
      {
        pA = A1.data( );
        pB = B1.data( );
        pC = C1.data( );
      }
      else
      {
        pA = A2.data( );
        pB = B2.data( );
        pC = C2.data( );
      }

      matrixMatrixProduct<sizeA2>( pC, pA, pB, alpha, sizeA1, sizeB2 );

    }
    auto end = std::chrono::high_resolution_clock::now( );
    std::chrono::duration<double> elapsed = end - start;

    double numberOfFlops = 1.0 * numberOfIterations * lengthC * ( 3 + 2 ); //two adds and three mult!
    double flops = (double) numberOfFlops / ( elapsed.count( ) );

    printf( "%15d \t %15d \t %15g \t %15e\n", sizeA1, sizeA2, elapsed.count( ), flops / ( 1.0e9 ) );

  }

  return 0;
}

0 ответов

Другие вопросы по тегам