Есть ли обходной путь для этого барьера OpenMP?
У меня есть этот параллельный регион, написанный на OpenMp:
std::vector<T> sharedResult;
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
}
#pramga omp barrier
#pragma omp for nowait
for(size_t i=0; i<sharedResult.size(); i++){
foo(sharedResult[i]);
}
...
}
Боюсь что #pragma omp barrier
является необходимым. Причина, по-моему, заключается в том, что в противном случае, когда поток достиг последнего #pragma omp for
, sharedResult.size()
в этот момент все еще не в своем конечном состоянии (полученном, когда предыдущая параллель для завершена). Обратите внимание, что, к сожалению, sharedResult
Размер ранее неизвестен.
К сожалению, я заметил, что этот барьер создает большие накладные расходы, то есть одна конкретная итерация дороже, чем все остальные, поэтому все потоки должны ждать потока, который выполняет эту итерацию. Это можно рассматривать как дисбаланс нагрузки, но я не нашел решения, чтобы решить эту проблему.
Итак, мой вопрос: есть ли способ запустить последнюю параллель, не дожидаясь завершения предыдущей, или нет серьезного способа улучшить это?
1 ответ
Я бы согласился, что барьер необходим. Я вижу несколько выходов, с возрастающей сложностью и, вероятно, с возрастающей эффективностью:
Задачи
Разместите задачу для каждого элемента результата:
#pragma omp parallel
{
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
// would prefer a range based loop here, but
// there seem to be issues with passing references
// to tasks in certain compilers
for(size_t i=0; i<result.size(); i++){
{
#pragma omp task
foo(result[i]);
}
}
Вы даже можете опубликовать задачу в начальном цикле. Если задач слишком много, вы можете получить значительные накладные расходы.
Обработка очереди результатов с готовыми потоками
Теперь это сложнее - в частности, вам нужно различать очередь результатов пустую и все потоки, завершающие свой первый цикл.
std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
{
#pragma omp single
threadsBusy = omp_num_threads();
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++){
//fill result
}
#pragma omp critical
{
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
threadsBusy--;
}
do {
bool hasResult, allThreadsDone;
// We need a copy here as the vector may be resized
// and elements may become invalid by insertion
T myResult;
#pragma omp critical
{
if (resultIndex < sharedResult.size()) {
resultIndex++;
hasResult = true;
myResult = sharedResult[myResult];
} else {
hasResult = false;
}
allThreadsDone = threadsBusy == 0;
}
if (hasResult) {
foo(myResult);
} else {
if (allThreadsDone) {
break;
}
// If we just continue here, we will spin on the mutex
// Unfortunately there are no condition variables in OpenMP
// So instead we go for a quick nap as a compromise
// Feel free to tune this accordingly
std::this_thread::sleep_for(10ms);
}
} while (true);
}
Примечание. Обычно я тестирую код, который публикую здесь, но не смог из-за отсутствия полного примера.
Обработка результатов в чанках через параллельные циклы
Наконец, вы можете запустить параллельные циклы несколько раз для тех результатов, которые уже сделаны. Однако это имеет ряд проблем. Во-первых, каждая область общего доступа должна встречаться всеми потоками, даже теми, которые заканчивают первый поздно. Таким образом, вам придется отслеживать циклы, которые вы запускаете. Кроме того, границы цикла должны быть одинаковыми для каждого потока - и вы должны только читать sharedResult.size()
в критическом разделе. Таким образом, вы должны прочитать это заранее в общую переменную одного потока в критическом разделе, но ждать со всеми потоками, пока он не будет прочитан должным образом. Кроме того, вам придется использовать динамическое планирование, в противном случае вы, скорее всего, будете использовать статическое планирование, и в любом случае вы будете ожидать завершения потоков. Вы отредактировали пример, не делающий ничего из этого. Я бы не принял это как должное, что for nowait schedule(dynamic)
может завершиться до того, как все потоки в команде войдут в него (но это работает с libgomp). Учитывая все обстоятельства, я бы не пошел туда.