Многопоточность не эффективна: отладка ложного обмена?

У меня есть следующий код, который запускает несколько потоков (пул потоков) в самом начале (startWorkers()). Впоследствии, в какой-то момент у меня есть контейнер, полный myWorkObject экземпляры, которые я хочу обрабатывать, используя несколько рабочих потоков одновременно. myWorkObject полностью изолированы от другого с точки зрения использования памяти. Сейчас давайте предположим, что у myWorkObject есть метод doWorkIntenseStuffHere() который занимает некоторое время процессора для расчета.

При тестировании следующего кода я заметил, что этот код плохо масштабируется с количеством потоков, а издержки на инициализацию / синхронизацию рабочих потоков превышают преимущества многопоточности, если нет активных 3-4 потоков. Я посмотрел на эту проблему и прочитал о проблеме ложного обмена, и я предполагаю, что мой код страдает от этой проблемы. Тем не менее, я хотел бы отладить / профилировать мой код, чтобы увидеть, происходит ли какое-то голодание / ложный обмен. Как я могу это сделать? Пожалуйста, не стесняйтесь критиковать что-либо о моем коде, так как я все еще много узнаю о памяти / процессорах и многопоточности в частности.

#include <boost/thread.hpp>

class MultiThreadedFitnessProcessingStrategy
{
public:
    MultiThreadedFitnessProcessingStrategy(unsigned int numWorkerThreads):
        _startBarrier(numWorkerThreads + 1),
        _endBarrier(numWorkerThreads + 1),
        _started(false),
        _shutdown(false),
        _numWorkerThreads(numWorkerThreads)
    {
        assert(_numWorkerThreads > 0);
    }


    virtual ~MultiThreadedFitnessProcessingStrategy()
    {
        stopWorkers();
    }


void startWorkers()
{
    _shutdown = false;
    _started = true;

    for(unsigned int i = 0; i < _numWorkerThreads;i++)
    {
        boost::thread*  workerThread = new boost::thread(
                boost::bind(&MultiThreadedFitnessProcessingStrategy::workerTask, this,i)
        );
        _threadQueue.push_back(new std::queue<myWorkObject::ptr>());
        _workerThreads.push_back(workerThread);
    }
}


void stopWorkers()
{
    _startBarrier.wait();
    _shutdown = true;
    _endBarrier.wait();

    for(unsigned int i = 0; i < _numWorkerThreads;i++)
    {
        _workerThreads[i]->join();
    }

}

void workerTask(unsigned int id)
{

    //Wait until all worker threads have started.
    while(true)
    {
        //Wait for any input to become available.
        _startBarrier.wait();

        bool queueEmpty = false;
        std::queue<SomeClass::ptr >* myThreadq(_threadQueue[id]);

        while(!queueEmpty)
        {

            SomeClass::ptr myWorkObject;

            //Make sure queue is not empty,
            //Caution: this is necessary if start barrier was triggered without queue input (e.g., shutdown) , which can happen.
            //Do not try to be smart and refactor this without knowing what you are doing!
            queueEmpty = myThreadq->empty();


            if(!queueEmpty)
            {
                chromosome = myThreadq->front();
                assert(myWorkObject);
                myThreadq->pop();
            }

            if(myWorkObject)
            {
                myWorkObject->doWorkIntenseStuffHere();
            }
        }

        //Wait until all worker threads have synchronized.
        _endBarrier.wait();

        if(_shutdown)
        {
            return;
        }
    }
}


void doWork(const myWorkObject::chromosome_container &refcontainer)
{

    if(!_started)
    {
        startWorkers();
    }

    unsigned int j = 0;
    for(myWorkObject::chromosome_container::const_iterator it = refcontainer.begin();
            it != refcontainer.end();++it)
    {
        if(!(*it)->hasFitness())
        {
            assert(*it);
            _threadQueue[j%_numWorkerThreads]->push(*it);
            j++;
        }
    }

    //Start Signal!
    _startBarrier.wait();

    //Wait for workers to be complete
    _endBarrier.wait();

}


    unsigned int getNumWorkerThreads() const
    {
        return _numWorkerThreads;
    }

    bool isStarted() const
    {
        return _started;
    }


private:

    boost::barrier _startBarrier;
    boost::barrier _endBarrier;

    bool _started;
    bool _shutdown;

    unsigned int _numWorkerThreads;

    std::vector<boost::thread*> _workerThreads;

    std::vector< std::queue<myWorkObject::ptr >* > _threadQueue;


};

2 ответа

Профилирование на основе выборки может дать вам довольно хорошее представление о том, испытываете ли вы ложные данные. Вот предыдущая ветка, которая описывает несколько способов решения этой проблемы. Я не думаю, что в этой теме упоминается Perf-утилита Linux. Это быстрый, простой и бесплатный способ подсчитать пропуски в кеше, которые могут подсказать вам, что вам нужно знать (испытываю ли я значительное количество пропусков в кеше, что соответствует тому, сколько раз я обращаюсь к определенной переменной?).

Если вы обнаружите, что ваша схема многопоточности может вызывать много пропусков конфликтов, вы можете попробовать объявить экземпляры myWorkObject или содержащиеся в них данные, которые вас действительно беспокоят __attribute__((aligned(64))) (выравнивание до 64 байтовых строк кэша).

Если вы работаете в Linux, есть инструмент под названием valgrind, с одним из модулей, выполняющих моделирование эффектов кэша (cachegrind). Пожалуйста, посмотрите на

http://valgrind.org/docs/manual/cg-manual.html

Другие вопросы по тегам