Эффективный способ передачи сигналов и сохранения открытости?

У меня есть некоторый код, который пытается выполнить интенсивную матричную обработку, поэтому я подумал, что будет быстрее, если я буду многопоточным. Тем не менее, я намерен сохранить поток живым, чтобы в будущем его можно было использовать для дальнейшей обработки. Здесь проблема в том, что многопоточная версия кода работает медленнее, чем отдельный поток, и я считаю, что проблема заключается в том, как я сигнализирую / поддерживаю мои потоки живыми.

Я использую pthreads на Windows и C++. Вот мой код для потока, где runtest() - это функция, где выполняются матричные вычисления:

void* playQueue(void* arg)
{
    while(true)
    {
        pthread_mutex_lock(&queueLock);
        if(testQueue.empty())
            break;
        else
            testQueue.pop();
        pthread_mutex_unlock(&queueLock);
        runtest();
    }
    pthread_exit(NULL); 
}

Функция playQueue() - это функция, передаваемая pthread, и то, что у меня есть на данный момент, состоит в том, что есть очередь (testQueue), скажем, 1000 элементов, и есть 100 потоков. Каждый поток будет продолжать работать до тех пор, пока очередь не станет пустой (следовательно, содержимое внутри мьютекса).

Я полагаю, что причина того, что многопоточность работает так медленно, из-за того, что называется ложным разделением (я думаю?), И мой метод сигнализации потока для вызова runtest() и поддержания потока живым плохим.

Что может быть эффективным способом сделать это, чтобы многопоточная версия работала быстрее (или, по крайней мере, так же быстро), как итеративная версия?

ЗДЕСЬ ПОЛНАЯ ВЕРСИЯ МОЕГО КОДА (без матрицы)

# include <cstdlib>
# include <iostream>
# include <cmath>
# include <complex>
# include <string>
# include <pthread.h>
# include <queue>

using namespace std;

# include "matrix_exponential.hpp"
# include "test_matrix_exponential.hpp"
# include "c8lib.hpp"
# include "r8lib.hpp"

# define NUM_THREADS 3

int main ( );
int counter;
queue<int> testQueue;
queue<int> anotherQueue;
void *playQueue(void* arg);
void runtest();
void matrix_exponential_test01 ( );
void matrix_exponential_test02 ( );
pthread_mutex_t anotherLock;
pthread_mutex_t queueLock;
pthread_cond_t queue_cv;

int main ()

{
    counter = 0;

   /* for (int i=0;i<1; i++)
        for(int j=0; j<1000; j++)
        {
            runtest();
          cout << counter << endl;
        }*/

    pthread_t threads[NUM_THREADS];
    pthread_mutex_init(&queueLock, NULL);
    pthread_mutex_init(&anotherLock, NULL);
    pthread_cond_init (&queue_cv, NULL);
    for(int z=0; z<1000; z++)
    {
        testQueue.push(1);
    }
    for( int i=0; i < NUM_THREADS; i++ )
    {
       pthread_create(&threads[i], NULL, playQueue, (void*)NULL);
    }
    while(anotherQueue.size()<NUM_THREADS)
    {

    }
    cout << counter;
    pthread_mutex_destroy(&queueLock);
    pthread_cond_destroy(&queue_cv);
    pthread_cancel(NULL);
    cout << counter;
    return 0;
}

void* playQueue(void* arg)
{
    while(true)
    {
        cout<<counter<<endl;
        pthread_mutex_lock(&queueLock);
        if(testQueue.empty()){
                pthread_mutex_unlock(&queueLock);
            break;
        }
        else
            testQueue.pop();
        pthread_mutex_unlock(&queueLock);
        runtest();
    }
    pthread_mutex_lock(&anotherLock);
    anotherQueue.push(1);
    pthread_mutex_unlock(&anotherLock);
    pthread_exit(NULL);
}

void runtest()
{
      counter++;
      matrix_exponential_test01 ( );
      matrix_exponential_test02 ( );
}

Таким образом, здесь "matrix_exponential_tests" взяты с этого сайта с разрешения и там, где происходит вся математическая математика. Счетчик используется только для отладки и проверки работоспособности всех экземпляров.

1 ответ

Разве это не застряло?

while(true)
{
    pthread_mutex_lock(&queueLock);
    if(testQueue.empty())
        break; //<----------------you break without unlock the mutex...
    else
        testQueue.pop();
    pthread_mutex_unlock(&queueLock);
    runtest();
}

Раздел между блокировкой и разблокировкой работает медленнее, чем если бы он был в одном потоке.

мьютексы замедляют вас. вам следует заблокировать только критическую секцию, и если вы хотите ускорить ее, постарайтесь вообще не использовать mutex.

Вы можете сделать это, предоставив тест через аргумент функции, а не используя очередь.

Одним из способов избежать использования мьютекса является использование vector без удаления и std::atomic_int (C++11) в качестве индекса (или для блокировки только получения текущего индекса и приращения)

или используйте итератор так:

vector<test> testVector;
vector<test>::iterator it;
//when it initialized to:
it = testVector.begin();

Теперь ваш цикл может быть таким:

while(true)
{
    vector<test>::iterator it1;
    pthread_mutex_lock(&queueLock);
    it1 = (it==testVector.end())? it : it++; 
    pthread_mutex_unlock(&queueLock);

    //now you outside the critical section: 
    if(it==testVector.end())
        break; 
    //you don't delete or change the vector
    //so you can use the it1 iterator freely
    runtest();
}
Другие вопросы по тегам