C++ многопоточность и сродство

Я пишу простой пул потоков для своего приложения, который я тестирую на двухъядерном процессоре. Обычно это работает хорошо, но я заметил, что когда другие процессы используют более 50% процессора, мое приложение почти останавливается. Это сделало меня любопытным, поэтому я решил воспроизвести эту ситуацию и создал вспомогательное приложение, которое просто выполняет бесконечный цикл (без многопоточности), занимая 50% процессора. Во время работы вспомогательного приложения многопоточное приложение практически останавливается, как и раньше (скорость обработки падает с 300-400 задач в секунду до 5-10 задач в секунду). Но когда я изменил привязку процессов моей многопоточной программы к использованию только одного ядра (вспомогательное по-прежнему использует оба), оно начало работать, конечно, используя не более 50% оставшегося процессора. Когда я отключил многопоточность в своем приложении (все еще обрабатывая те же задачи, но без пула потоков), это работало как чудо, без какого-либо замедления от вспомогательного, который все еще работал (и именно так должны вести себя два приложения при работе на двух ядрах), Но когда я включаю многопоточность, проблема возвращается.

Я сделал специальный код для тестирования этого конкретного ThreadPool:

заголовок

#ifndef THREADPOOL_H_
#define THREADPOOL_H_

typedef double FloatingPoint;

#include <queue>
#include <vector>

#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>

using namespace std;

struct ThreadTask
{
    int size;

    ThreadTask(int s)
    {
        size = s;
    }
    ~ThreadTask()
    {
    }
};

class ThreadPool
{
protected:
    queue<ThreadTask*> tasks;
    vector<std::thread> threads;
    std::condition_variable task_ready;
    std::mutex variable_mutex;
    std::mutex max_mutex;

    std::atomic<FloatingPoint> max;
    std::atomic<int> sleeping;
    std::atomic<bool> running;

    int threads_count;

    ThreadTask * getTask();
    void runWorker();
    void processTask(ThreadTask*);
    bool isQueueEmpty();
    bool isTaskAvailable();
    void threadMethod();
    void createThreads();
    void waitForThreadsToSleep();
public:
    ThreadPool(int);
    virtual ~ThreadPool();

    void addTask(int);
    void start();
    FloatingPoint getValue();

    void reset();
    void clearTasks();
};

#endif /* THREADPOOL_H_ */

и.cpp

#include "stdafx.h"
#include <climits>
#include <float.h>

#include "ThreadPool.h"

ThreadPool::ThreadPool(int t)
{
    running = true;
    threads_count = t;
    max = FLT_MIN;
    sleeping = 0;

    if(threads_count < 2)                                       //one worker thread has no sense
    {
        threads_count = (int)thread::hardware_concurrency();    //default value

        if(threads_count == 0)  //in case it fails ('If this value is not computable or well defined, the function returns 0')
            threads_count = 2;
    }

    printf("%d worker threads\n", threads_count);
}

ThreadPool::~ThreadPool()
{
    running = false;

    reset();                    //it will make sure that all worker threads are sleeping on condition variable
    task_ready.notify_all();    //let them finish in natural way

    for (auto& th : threads)
        th.join();
}

void ThreadPool::start()
{
    createThreads();
}

FloatingPoint ThreadPool::getValue()
{
    waitForThreadsToSleep();

    return max;
}

void ThreadPool::createThreads()
{
    threads.clear();

    for(int i = 0; i < threads_count; ++i)
        threads.push_back(std::thread(&ThreadPool::threadMethod, this));
}

void ThreadPool::threadMethod()
{
    while(running)
        runWorker();
}

void ThreadPool::runWorker()
{
    ThreadTask * task = getTask();
    processTask(task);
}

void ThreadPool::processTask(ThreadTask * task)
{
    if(task == NULL)
        return;

    //do something to simulate processing

    vector<int> v;

    for(int i = 0; i < task->size; ++i)
        v.push_back(i);

    delete task;
}

void ThreadPool::addTask(int s)
{
    ThreadTask * task = new ThreadTask(s);

    std::lock_guard<std::mutex> lock(variable_mutex);
    tasks.push(task);

    task_ready.notify_one();
}

ThreadTask * ThreadPool::getTask()
{
    std::unique_lock<std::mutex> lck(variable_mutex);

    if(tasks.empty())
    {
        ++sleeping;
        task_ready.wait(lck);
        --sleeping;
        if(tasks.empty())   //in case of ThreadPool being deleted (destructor calls notify_all), or spurious notifications
            return NULL;    //return to main loop and repeat it
    }

    ThreadTask * task = tasks.front();
    tasks.pop();

    return task;
}

bool ThreadPool::isQueueEmpty()
{
    std::lock_guard<std::mutex> lock(variable_mutex);

    return tasks.empty();
}

bool ThreadPool::isTaskAvailable()
{
    return !isQueueEmpty();
}

void ThreadPool::waitForThreadsToSleep()
{
    while(isTaskAvailable())
        std::this_thread::yield();  //wait for all tasks to be taken
    while(true) //wait for all threads to finish they last tasks
    {
        if(sleeping == threads_count)
            break;

        std::this_thread::yield();
    }
}

void ThreadPool::clearTasks()
{
    std::unique_lock<std::mutex> lock(variable_mutex);

    while(!tasks.empty()) tasks.pop();
}

void ThreadPool::reset()    //don't call this when var_mutex is already locked by this thread!
{
    clearTasks();

    waitForThreadsToSleep();

    max = FLT_MIN;
}

как это проверено:

ThreadPool tp(2);
tp.start();

int iterations = 1000;
int task_size = 1000;

for(int j = 0; j < iterations; ++j)
{
    printf("\r%d left", iterations - j);

    tp.reset();
    for(int i = 0; i < 1000; ++i)
        tp.addTask(task_size);

    tp.getValue();  
}


return 0;

Я собрал этот код с помощью mingw с gcc 4.8.1 ( отсюда) и Visual Studio 2012 (VC11) на Win7 64, оба на отладочной конфигурации.

Две программы, созданные с использованием упомянутых компиляторов, ведут себя совершенно по-разному.

а) сборка программ с помощью mingw работает намного быстрее, чем сборка на VS, когда она может занимать весь процессор (система показывает почти 100% использования процессора этим процессом, поэтому я не думаю, что mingw тайно устанавливает привязку к одному ядру). Но когда я запускаю вспомогательную программу (используя 50% ЦП), она сильно замедляется (примерно в несколько десятков раз). Загрузка ЦП в этом случае составляет около 50%-50% для основной программы и вспомогательной.

б) сборка программы с VS 2012, при использовании всего процессора, даже медленнее, чем а) ​​с замедлением (когда я установил task_size = 1, их скорости были похожи). Но когда вспомогательная программа работает, основная программа даже занимает большую часть ЦП (загрузка составляет около 66% основной - 33% вспомогательной), и в результате замедление едва заметно.

Когда установлено использование только одного ядра, обе программы заметно ускоряются (примерно в 1,5 - 2 раза), и mingw one перестает быть уязвимым для конкуренции.

Ну, теперь я не знаю, что делать. Моя программа ведет себя по-разному при сборке двумя разными наборами инструментов. Это недостаток в моем коде (который, предположим, является правдой), или что-то делать с компиляторами, имеющими проблемы с C++11?

0 ответов

Другие вопросы по тегам