Реализация очереди кражи работы в C/C++?
Я ищу правильную реализацию очереди кражи работы в C/CPP. Я посмотрел вокруг Google, но не нашел ничего полезного.
Возможно, кто-то знаком с хорошей реализацией с открытым исходным кодом? (Я предпочитаю не реализовывать псевдокод, взятый из оригинальных научных работ).
13 ответов
Нет бесплатного обеда.
Пожалуйста, посмотрите оригинальную работу по краже бумаги. Этот документ трудно понять. Я знаю, что статья содержит теоретическое доказательство, а не псевдокод. Тем не менее, просто нет такой гораздо более простой версии, чем TBB. Если таковые имеются, это не даст оптимальной производительности. Сама кража работы влечет за собой некоторые накладные расходы, поэтому оптимизация и приемы очень важны. Особенно, очереди должны быть потокобезопасными. Реализация синхронизации с высокой степенью масштабируемости и минимальными издержками является сложной задачей.
Мне действительно интересно, зачем тебе это нужно. Я думаю, что правильная реализация означает что-то вроде TBB и Cilk. Опять же, воровство трудновыполнимо.
Реализовать "воровство работы" не сложно в теории. Вам нужен набор очередей, содержащих задачи, которые работают, выполняя комбинацию вычислений и генерируя другие задачи, чтобы выполнять больше работы. И вам нужен атомарный доступ к очередям, чтобы помещать вновь созданные задачи в эти очереди. Наконец, вам нужна процедура, которую каждая задача вызывает в конце, чтобы найти больше работы для потока, выполнившего задачу; эта процедура должна искать в рабочих очередях, чтобы найти работу.
В большинстве таких систем кражи работы предполагается, что имеется небольшое количество потоков (обычно резервируемых реальными ядрами процессора), и что на каждый поток существует ровно одна рабочая очередь. Затем вы сначала пытаетесь украсть работу из собственной очереди, а если она пуста, попробуйте украсть у других. Сложно знать, какие очереди искать; Сканирование их поочередно для работы довольно дорого и может вызвать колоссальные разногласия между потоками в поисках работы.
Пока это все довольно общие вещи с одним двумя основными исключениями: 1) переключение контекста (например, установка регистров контекста процессора, таких как "стек") не может быть задано в чистом C или C++. Вы можете решить эту проблему, согласившись написать часть своего пакета в машинном коде целевой платформы. 2) Атомарный доступ к очередям для мультипроцессора не может быть сделан исключительно на C или C++ (без учета алгоритма Деккера), поэтому вам нужно будет кодировать те, которые используют примитивы синхронизации на ассемблере, такие как X86 LOCK XCH или Compare and Swap. Теперь код, используемый для обновления очереди после получения безопасного доступа, не очень сложен, и вы можете легко написать это в несколько строк в C.
Тем не менее, я думаю, вы обнаружите, что попытка кодировать такой пакет на C и C++ с помощью смешанного ассемблера все еще довольно неэффективна, и в конечном итоге вы все равно в конечном итоге закодируете все это на ассемблере. Все, что осталось, это C/C++ совместимые точки входа:-}
Я сделал это для нашего языка параллельного программирования PARLANSE, который предлагает идею сколь угодно большого числа параллельных вычислений, работающих и взаимодействующих (синхронизирующих) в любой момент. Он реализован за кулисами на X86 точно с одним потоком на процессор, и реализация полностью на ассемблере. Код для кражи работы - это, вероятно, всего 1000 строк, и его сложный код, потому что вы хотите, чтобы он был очень быстрым в неконфликтном случае.
Реальная ложь мази для C и C++ заключается в том, что, когда вы создаете задачу, представляющую работу, сколько стекового пространства вы назначаете? Программы на последовательном C/C++ избегают этого вопроса, просто перераспределяя огромные объемы (например, 10 МБ) одного линейного стека, и никто не заботится о том, сколько этого стекового пространства тратится впустую. Но если вы можете создавать тысячи задач и выполнять их все в определенный момент времени, вы не сможете разумно выделить 10 МБ для каждой из них. Так что теперь вам нужно либо статически определить, сколько стекового пространства понадобится задаче (сложный по Тьюрингу), либо вам нужно выделить куски стека (например, на вызов функции), чего не делают широко доступные компиляторы C/C++ (например, тот, который вы, вероятно, используете). Последний выход - ограничить создание задач, чтобы в любой момент ограничить их несколькими сотнями, и объединить несколько сотен действительно огромных стеков среди текущих задач. Вы не можете сделать последнее, если задачи могут заблокировать / приостановить состояние, потому что вы столкнетесь с вашим порогом. Таким образом, вы можете сделать это, только если задачи выполняют только вычисления. Это выглядит как довольно серьезное ограничение.
Для PARLANSE мы создали компилятор, который распределяет записи активации в куче для каждого вызова функции.
Взгляните на многопоточные блоки Intel.
Эта библиотека с открытым исходным кодом https://github.com/cpp-taskflow/cpp-taskflow поддерживает пул потоков рабочих краж с декабря 2018 года.
Посмотрите на WorkStealingQueue
класс, который реализует очередь на кражу работы, как описано в статье "Динамическая круговая рабочая кража", SPAA, 2015.
Если вы ищете отдельную реализацию класса очереди рабочего стола в C++, построенную на pthread или boost::thread, то удачи, насколько мне известно, ее нет.
Однако, как говорили другие, у Cilk, TBB и PPL от Microsoft есть встроенные реализации рабочих столов.
Вопрос в том, хотите ли вы использовать очередь рабочих столов или внедрить ее? Если вы просто хотите использовать один из них, то вышеприведенные варианты являются хорошими отправными точками, для этого достаточно просто запланировать "задачу" в любом из них.
Как сказал BlueRaja, task_group & structured_task_group в PPL сделает это, также обратите внимание, что эти классы также доступны в последней версии Intel TBB. Параллельные циклы (parallel_for, parallel_for_each) также реализованы с помощью рабочего стола.
Если вам нужно смотреть на источник, а не использовать реализацию, TBB - это OpenSource, а Microsoft поставляет источники для своего CRT, так что вы можете приступить к изучению.
Вы также можете посмотреть в блоге Джо Даффи о реализации C# (но это C# и модель памяти другая).
-Rick
Существует инструмент, позволяющий сделать это очень элегантным способом. Это действительно эффективный способ распараллелить вашу программу за очень короткое время.
Премия HPC Challenge
Наша заявка Cilk на премию HPC Challenge Class 2 была удостоена награды 2006 года за "Лучшее сочетание элегантности и производительности". Награда была вручена в SC'06 в Тампе 14 ноября 2006 года.
OpenMP вполне может поддерживать кражу работы, хотя его называют рекурсивным параллелизмом
Спецификация OpenMP определяет конструкции задач (которые могут быть вложенными, поэтому очень подходят для рекурсивного параллелизма), но не определяет детали того, как они реализованы. Реализации OpenMP, включая gcc, обычно используют некоторую форму кражи работы для задач, хотя точный алгоритм (и результирующая производительность) могут отличаться!
Увидеть #pragma omp task
а также #pragma omp taskwait
Обновить
Глава 9 книги C++ Concurrency in Action описывает, как реализовать "кражу работы для потоков пула". Я не читал / не реализовал это сам, но это не выглядит слишком сложно.
Класс structd_task_group в PPL использует очередь для кражи работы для своей реализации. Если вам нужен WSQ для многопоточности, я бы порекомендовал это.
Если вы действительно ищете источник, я не знаю, указан ли код в ppl.h или есть предварительно скомпилированный объект; Я должен буду проверить, когда вернусь домой сегодня вечером.
Поможет ли разбивка ваших рабочих задач на более мелкие блоки исключить необходимость в краже работы?
Я перенес этот проект C на C++.
Оригинал Steal
может возникнуть грязное чтение при расширении массива. Я попытался исправить ошибку, но в конце концов сдался, потому что мне фактически не нужен динамически растущий стек. Вместо того, чтобы пытаться выделить место, Push
метод просто возвращает false
, Затем вызывающая сторона может выполнить ожидание вращения, то есть while(!stack->Push(value)){}
,
#pragma once
#include <atomic>
// A lock-free stack.
// Push = single producer
// Pop = single consumer (same thread as push)
// Steal = multiple consumer
// All methods, including Push, may fail. Re-issue the request
// if that occurs (spinwait).
template<class T, size_t capacity = 131072>
class WorkStealingStack {
public:
inline WorkStealingStack() {
_top = 1;
_bottom = 1;
}
WorkStealingStack(const WorkStealingStack&) = delete;
inline ~WorkStealingStack()
{
}
// Single producer
inline bool Push(const T& item) {
auto oldtop = _top.load(std::memory_order_relaxed);
auto oldbottom = _bottom.load(std::memory_order_relaxed);
auto numtasks = oldbottom - oldtop;
if (
oldbottom > oldtop && // size_t is unsigned, validate the result is positive
numtasks >= capacity - 1) {
// The caller can decide what to do, they will probably spinwait.
return false;
}
_values[oldbottom % capacity].store(item, std::memory_order_relaxed);
_bottom.fetch_add(1, std::memory_order_release);
return true;
}
// Single consumer
inline bool Pop(T& result) {
size_t oldtop, oldbottom, newtop, newbottom, ot;
oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
ot = oldtop = _top.load(std::memory_order_acquire);
newtop = oldtop + 1;
newbottom = oldbottom - 1;
// Bottom has wrapped around.
if (oldbottom < oldtop) {
_bottom.store(oldtop, std::memory_order_relaxed);
return false;
}
// The queue is empty.
if (oldbottom == oldtop) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
// Make sure that we are not contending for the item.
if (newbottom == oldtop) {
auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
else {
result = ret;
_bottom.store(newtop, std::memory_order_release);
return true;
}
}
// It's uncontended.
result = _values[newbottom % capacity].load(std::memory_order_acquire);
return true;
}
// Multiple consumer.
inline bool Steal(T& result) {
size_t oldtop, newtop, oldbottom;
oldtop = _top.load(std::memory_order_acquire);
oldbottom = _bottom.load(std::memory_order_relaxed);
newtop = oldtop + 1;
if (oldbottom <= oldtop)
return false;
// Make sure that we are not contending for the item.
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
return false;
}
result = _values[oldtop % capacity].load(std::memory_order_relaxed);
return true;
}
private:
// Circular array
std::atomic<T> _values[capacity];
std::atomic<size_t> _top; // queue
std::atomic<size_t> _bottom; // stack
};
Полный Gist (включая юнит-тесты). Я проводил тесты только на сильной архитектуре (x86/64), поэтому слабые архитектуры могут измениться, если вы попытаетесь использовать это, например, на Neon/PPC.
Я не думаю, что JobSwarm использует кражу работы, но это первый шаг. Я не знаю других библиотек с открытым исходным кодом для этой цели.
Не знаю, поможет ли это вам, но посмотрите на эту статью о сети разработчиков AMD, она проста, но должна дать вам кое-что полезное