Параллельные кумулятивные (префиксные) суммы в OpenMP: передача значений между потоками

Предположим, у меня есть функция f(i) который зависит от индекса i (среди других значений, которые не могут быть предварительно вычислены). Я хочу заполнить массив a чтобы a[n] = sum(f(i)) from i=0 to n-1,

Редактировать: После комментария Христо Илиева я понял, что я делаю, это кумулятивная сумма / сумма префикса.

Это можно записать в коде как

float sum = 0;
for(int i=0; i<N; i++) {
    sum += f(i);
    a[i] = sum;
}

Теперь я хочу использовать OpenMP, чтобы сделать это параллельно. Один из способов сделать это с помощью OpenMP - выписать значения для f(i) параллельно, а затем позаботиться о зависимости в последовательном. Если f(i) это медленная функция, тогда это может хорошо работать, так как непараллельный цикл прост.

#pragma omp parallel for
for(int i=0; i<N; i++) {
    a[i] = f(i);
}
for(int i=1; i<N; i++) {
    a[i] += a[i-1];
}

Но это возможно сделать без непараллельного цикла с OpenMP. Однако решение, которое я нашел, сложное и, возможно, хакерское. Поэтому мой вопрос: есть ли более простой и менее запутанный способ сделать это с OpenMP?

Код ниже в основном запускает первый код, который я перечислил для каждого потока. Результатом является то, что значения a в заданном потоке верны с точностью до константы. Я сохраняю сумму для каждого потока в массив suma с nthreads+1 элементы. Это позволяет мне общаться между потоками и определять постоянное смещение для каждого потока. Тогда я исправляю значения a[i] со смещением.

float *suma;
#pragma omp parallel
{
    const int ithread = omp_get_thread_num();
    const int nthreads = omp_get_num_threads();
    const int start = ithread*N/nthreads;
    const int finish = (ithread+1)*N/nthreads;
    #pragma omp single
    {
        suma = new float[nthreads+1];
        suma[0] = 0;
    }
    float sum = 0;
    for (int i=start; i<finish; i++) {
        sum += f(i);
        a[i] = sum;
    }
    suma[ithread+1] = sum;
    #pragma omp barrier
    float offset = 0;
    for(int i=0; i<(ithread+1); i++) {
        offset += suma[i];
    }
    for(int i=start; i<finish; i++) {
        a[i] += offset;
    }
}
delete[] suma;

Простой тест просто установить f(i) = i, Тогда решение a[i] = i*(i+1)/2 (и на бесконечности это -1/12).

2 ответа

Решение

Вы можете расширить свою стратегию на произвольное количество субрегионов и рекурсивно уменьшить их, используя задачи:

#include<vector>
#include<iostream>

using namespace std;

const int n          = 10000;
const int baseLength = 100;

int f(int ii) {
  return ii;
}

int recursiveSumBody(int * begin, int * end){

  size_t length  = end - begin;
  size_t mid     = length/2;
  int    sum     = 0;


  if ( length < baseLength ) {
    for(size_t ii = 1; ii < length; ii++ ){
        begin[ii] += begin[ii-1];
    }
  } else {
#pragma omp task shared(sum)
    {
      sum = recursiveSumBody(begin    ,begin+mid);
    }
#pragma omp task
    {
      recursiveSumBody(begin+mid,end      );
    }
#pragma omp taskwait

#pragma omp parallel for
    for(size_t ii = mid; ii < length; ii++) {
      begin[ii] += sum;
    }

  }
  return begin[length-1];
}

void recursiveSum(int * begin, int * end){

#pragma omp single
  {
    recursiveSumBody(begin,end);
  }    
}


int main() {

  vector<int> a(n,0);

#pragma omp parallel
  {
    #pragma omp for
    for(int ii=0; ii < n; ii++) {          
      a[ii] = f(ii);
    }  

    recursiveSum(&a[0],&a[n]);

  }
  cout << n*(n-1)/2 << endl;
  cout << a[n-1] << endl;

  return 0;
}

Для полноты я добавляю код MWE OP, когда учитывается замечание Христо:

      #include <iostream>
#include <omp.h>
using std::cout;
using std::endl;

const int N = 10;
const int Nthr = 4;
float f(int i) {return (float)i;}

int main(void) {
    omp_set_num_threads(Nthr);
    float* a = new float[N];
    float *suma = new float[Nthr+1];
    suma[0] = 0.0;
    float sum = 0.0;
#pragma omp parallel for schedule(static) firstprivate(sum)
    for (int i=0; i<N; i++) {
        sum += f(i);
        a[i] = sum;
        suma[omp_get_thread_num()+1] = sum;
    }

    // this for-loop is also a commulative sum, but it has only Nthr iterations
    for (int i=1; i<Nthr;i++)
        suma[i] += suma[i-1];

#pragma omp parallel for schedule(static)
    for(int i=0; i< N; i++) {
        a[i] += suma[omp_get_thread_num()];
    }

    for (int i=0; i<N; i++) {
        cout << a[i] << endl;
    }

    delete[] suma;
    int n = 95;
    cout << a[n] << endl << n*(n+1)/2 << endl;
    delete[] a;
    return 0;
}

Другие вопросы по тегам