Безопасное программирование потоков в симуляторах с использованием PIN

Я использую аппаратный симулятор, который использует инструменты PIN для выполнения рабочей нагрузки. В качестве рабочей нагрузки я использую следующий код. Хотя он работает в Ubuntu с флагом -lpthread, он останавливается на симуляторе, когда присоединяется к потокам.

Я думаю, что в этом коде есть что-то небезопасное, что может выдержать нативная ОС, а симулятор - нет. Каков наиболее подходящий способ кодировать это?

main.h:

#include <stdlib.h>
#include <iostream>
#include <fstream>
#include <string>
#include <pthread.h>
#include <stdint.h>
#include <getopt.h>
#include <set>
#include <vector>
#include <algorithm>
#include <iterator>

#define NUM_OF_VERTICES 4
#define NUM_OF_PTHREADS (NUM_OF_VERTICES*(NUM_OF_VERTICES-1)/2)

std::string payload_texts[NUM_OF_VERTICES];
void payload_text_initialize();
double indices [NUM_OF_VERTICES][NUM_OF_VERTICES];

class thread_args {   
    public: 
        uint index1, index2;
        unsigned  long value = 0;
        unsigned  long * valuePointer = &value;
};

main.cc:

#include "main.h"

extern "C" {
    extern void mcsim_skip_instrs_begin();
    extern void mcsim_skip_instrs_end();
    extern void mcsim_spinning_begin();
    extern void mcsim_spinning_end();
    int32_t log_2(uint64_t);
}

using namespace std;

set<char> find_uniques(string str){
    set<char> unique_chars;
    for (int i = 0 ; i < str.length() ; i++ ){
        char c = str.at(i);
        if (unique_chars.find(c) == unique_chars.end())
            unique_chars.insert(c);
    }
    return unique_chars;
}

void * jaccard_visit(void *arg){
    thread_args * args = (thread_args *) arg;
    set<char> setunion;
    set<char> intersect;

    set<char> set1 = find_uniques(payload_texts[args->index1]);
    set<char> set2 = find_uniques(payload_texts[args->index2]);


    std::set_intersection(set1.begin(),set1.end(),set2.begin(),set2.end(),std::inserter(intersect,intersect.begin()));
    std::set_union(set1.begin(),set1.end(),set2.begin(),set2.end(),std::inserter(setunion,setunion.begin()));

    double similarity = ((double) intersect.size()) / ((double) setunion.size());
    indices[args->index1][args->index2] = similarity;
    indices[args->index2][args->index1] = similarity;

    unsigned long a = 1;
    unsigned long b = 1;
    unsigned long c = a + b;
    for (int i = 3 ; i < 100000 * (similarity - 0.9) ; i++){
        a = b;
        b = c; 
        c = a + b;
    }

    *(args->valuePointer) = c;

    return NULL;
}

void execute_parallel(){      
    pthread_t threads[NUM_OF_PTHREADS]; //array to hold thread information 
    thread_args *th_args = (thread_args*) malloc(NUM_OF_PTHREADS * sizeof(thread_args)); 

    cout << "NUM_OF_PTHREADS is " << NUM_OF_PTHREADS << endl;
    uint k = 0 ;
    for (int i = 0 ; i < NUM_OF_VERTICES ; i++){
        for (int j = i+1 ; j < NUM_OF_VERTICES ; j++){
            th_args[k].index1 = i;
            th_args[k].index2 = j;
            th_args[k].value = i+j; 
            th_args[k].valuePointer = &(th_args[k].value);
            pthread_create(&threads[k], NULL, jaccard_visit, (void*) &th_args[k]);
            cout << "Thread " << k << " is started" << endl;
            k++;
        }
    }

    cout << "k is " << k << endl;

    for(int i = 0; i < NUM_OF_PTHREADS; i++){ 
        cout << "Thread " << i << " is joined" << endl;
        pthread_join(threads[i], NULL); 
    } 

    cout << "Free threads" << endl ;

    free(th_args); 

} 

void manual_schedule(){
    pthread_t th0, th1, th2, th3, th4, th5;
    thread_args arg0, arg1, arg2, arg3, arg4, arg5;

    arg0.index1 = 0;    arg0.index2 = 1;    arg0.value = 0; arg0.valuePointer = &arg0.value;
    arg1.index1 = 0;    arg1.index2 = 2;    arg1.value = 1; arg1.valuePointer = &arg1.value;
    arg2.index1 = 0;    arg2.index2 = 3;    arg2.value = 2; arg2.valuePointer = &arg2.value;
    arg3.index1 = 1;    arg3.index2 = 2;    arg3.value = 3; arg3.valuePointer = &arg3.value;
    arg4.index1 = 1;    arg4.index2 = 3;    arg4.value = 4; arg4.valuePointer = &arg4.value;
    arg5.index1 = 2;    arg5.index2 = 3;    arg5.value = 5; arg5.valuePointer = &arg5.value;

    cout << "Arguments are done ";

    pthread_create(&th0, NULL, jaccard_visit, (void*) &arg0);
    pthread_create(&th1, NULL, jaccard_visit, (void*) &arg1);
    pthread_create(&th2, NULL, jaccard_visit, (void*) &arg2);
    pthread_create(&th3, NULL, jaccard_visit, (void*) &arg3);
    pthread_create(&th4, NULL, jaccard_visit, (void*) &arg4);
    pthread_create(&th5, NULL, jaccard_visit, (void*) &arg5);

    cout << "Threads are created" << endl;

    cout << "Join starts here" << endl;
    pthread_join(th0, NULL); 
    pthread_join(th1, NULL); 
    pthread_join(th2, NULL); 
    pthread_join(th3, NULL); 
    pthread_join(th4, NULL); 
    pthread_join(th5, NULL); 

    cout << "Fibonaccis: " <<endl;
    cout << *(arg0.valuePointer) << endl;
    cout << *(arg1.valuePointer) << endl;
    cout << *(arg2.valuePointer) << endl;
    cout << *(arg3.valuePointer) << endl;
    cout << *(arg4.valuePointer) << endl;
    cout << *(arg5.valuePointer) << endl;
}

int main(int argc, const char * argv[]){
    cout << "Jaccard process is started"<<endl;
    mcsim_skip_instrs_begin();
    payload_text_initialize();
    mcsim_skip_instrs_end();

    cout << "Parallel part begins"<< endl;
    manual_schedule();

    cout << "Calculated results are being logged"<<endl;
    for (int i = 0 ; i < NUM_OF_VERTICES ; i++){
        for (int j = 0 ; j < NUM_OF_VERTICES ; j++){
            cout << indices[i][j] << " ";
        }
        cout << endl;
    }
}



void payload_text_initialize(){
    payload_texts[0] = "l5IC5uC9AzcROkE3YkDJ2lEzLts8XP8a9WqDgDLWjg1M7HysAUfDFwzLWjc7875PnZVUHLzi6nQaUMQDNUeG4Wn2UkiOB79tOlE1t6LaKYbYiCJwJ34CAOFZCIbFSmcLTAAoB1rvPfeA6oM3kV3C8BDvraGvXjUORLGFAcBRQCerb3WD0qhrrM0MVW0t93bBqlTsrkxg";
    payload_texts[1] = "tILKwAhbUkoqouKZ1G1VrZRmKwQnwzBgQirLkdedsYIAplKdEfk8oSmqdJmCJd5g0Q3VcJ8RYoxtIwA7jL1L01DcagIOuld0whcyM0yvSP0pMWO2yVTwOQPGkW2k7AHqzSEvb5BWkKsTexBsCUepjbG50T6vKsEHXGJ9aZwn2274Ekhnu1hlvuTqsS8jgwr0kQwhbwxN";
    payload_texts[2] = "LNyQgx3mox3szmRNn1tSB4ibVuLsTr7MfANlj41Y0hKStx3NJx1O52XxNiqTMDCu4eGwWYcBvFMEC5tl1E7Rsm0Q9NZsPAJIwuiPYQuXeUyhMmbFiwRk6PlziXne0QaFJ3TrncsHsL3LxIDyaDPScSRdEvX72IJmi2gQTHgASi0KkKH4Sr6VJV3FjdNjKwY2ncT5oSXZ";
    payload_texts[3] = "UxynTAvEWF4CcY9wUJRFnrX7sgrvvubcXUqH5DXK12UjSHDUME397S3BdB38FeMQJq8r7P7RILAY0qkw7OxUhGsZHRPmuY7VwKULqb6fx0Oy2McW2u07yqdAEMCN6AkQ1jTn2sXB4uWH21uLbjCf9i2V7W9tyw3cx6piE7XJb3vfbLI34OG5LKQXmVAGT0D6nbibaN8M";
}

execute_parallel() содержит 2 цикла for для создания и объединения потоков. manual_schedule() имеет развернутую версию того же кода. При выполнении по PIN-коду оба работают хорошо, пока не будет включена функция соединения первого потока. Когда происходит соединение, оно останавливается и остается таким же навсегда, без каких-либо сигналов или ошибок. Работая в Ubuntu с флагом -lpthread, он отлично работает и генерирует результаты.

Что может быть наиболее безопасным и подходящим способом реализации pthreads в этой ситуации?

заранее спасибо

редактировать

Я заметил, что программа зависает перед чтением payload_texts[args->index1], Добавление мьютекса помогло продолжить работу в этой точке. Также это работало одно время должным образом. Теперь он недетерминирован, в нескольких исполнениях одного и того же двоичного файла он редко заканчивается должным образом. Я думаю, что должна быть причина тупика внутри функции jaccard_visit. Я изменил это следующим образом:

void * jaccard_visit(void *arg){
    thread_args * args = (thread_args *) arg;
    set<char> setunion;
    set<char> intersect;
    int id = args->index1 * 10 + args->index2;

    pthread_mutex_lock(&cout_mutex);    cout << "Thread "<< id << " started with indices: " << args->index1 << " " << args->index2 << endl;     pthread_mutex_unlock(&cout_mutex);

    pthread_mutex_lock(&payload_mutex);
    set<char> set1 = find_uniques(payload_texts[args->index1]);
    set<char> set2 = find_uniques(payload_texts[args->index2]);
    pthread_mutex_unlock(&payload_mutex);

    pthread_mutex_lock(&cout_mutex);    cout << id << " : payload_texts were read" << endl;         pthread_mutex_unlock(&cout_mutex);
    pthread_mutex_lock(&cout_mutex);    cout << id << " : intersect was created, scan begins" << endl;         pthread_mutex_unlock(&cout_mutex);
    for (set<char>::iterator i = set1.begin(); i != set1.end(); i++) {
        char c1 = *i;
        for (set<char>::iterator j = set2.begin(); j != set2.end(); j++) {
            char c2 = *j;
            if (c1 == c2){
                intersect.insert(c1);
                pthread_mutex_lock(&cout_mutex);    cout << id << " : char" << c1 << " was inserted to intersection" << endl;         pthread_mutex_unlock(&cout_mutex);
                break;
            }
        }
    }
    pthread_mutex_lock(&cout_mutex);    cout << id << " : intersection is calculated" << endl;     pthread_mutex_unlock(&cout_mutex);
    for (set<char>::iterator i = set1.begin(); i != set1.end(); i++) {
        setunion.insert(*i);
    }
    for (set<char>::iterator i = set2.begin(); i != set2.end(); i++) {
        char c = *i;
        bool exists = false;
        for (set<char>::iterator j = set1.begin(); j != set1.end(); j++) {
            if (c == *j)
                exists = true;
        }
        if (exists == false)
            setunion.insert(c);
    }
    pthread_mutex_lock(&cout_mutex);    cout << id << " : union is calculated" << endl;            pthread_mutex_unlock(&cout_mutex);


    double similarity = ((double) intersect.size()) / ((double) setunion.size());
    pthread_mutex_lock(&cout_mutex);
    cout << id << " : similarity is calculated as " << similarity << endl;
    pthread_mutex_unlock(&cout_mutex);
    indices[args->index1][args->index2] = similarity;
    indices[args->index2][args->index1] = similarity;

    unsigned long a = 1;
    unsigned long b = 1;
    unsigned long c = a + b;

    pthread_mutex_lock(&cout_mutex);
    cout << id << " : fibonacci starts" << endl;
    pthread_mutex_unlock(&cout_mutex);

    for (int i = 3 ; i < 100000 * (similarity - 0.9) ; i++){
        a = b;
        b = c; 
        c = a + b;
    }
    *(args->valuePointer) = c;

    //pthread_exit(args->valuePointer);
    return NULL;
}

1 ответ

Решение

Наконец, я сделал это, выполнив следующие изменения в функции, которую выполняет каждый поток (jaccard_visit):

  1. Обернуть операции чтения глобальных переменных с помощью mutex_lock
  2. Удалить вызовы функций и реализовать их встроенными
  3. Избегайте использования set, вместо этого используйте строку или массив символов
  4. Отдельный пункт назначения и источник строковых функций

Следующий код работает довольно хорошо:

void * jaccard_visit(void *arg){
    thread_args * args = (thread_args *) arg;
    int id = args->id;
    pthread_mutex_lock(&cout_mutex);    cout << id << " : thread started" << endl;    pthread_mutex_unlock(&cout_mutex);

    pthread_mutex_lock(&payload_mutex);
    string str1 = my_graph[args->index1].payload_text;
    string str2 = my_graph[args->index2].payload_text;
    pthread_mutex_unlock(&payload_mutex);

    pthread_mutex_lock(&cout_mutex);    cout << id << " : payload texts are read" << endl;    pthread_mutex_unlock(&cout_mutex);

    int stringLength = str1.length() - 1;
    for (int i = 0; i < stringLength; i++) {
        for (int j = i + 1; j < stringLength;) {
            if (str1[i] == str1[j]) 
                str1[j] = str1[--stringLength];
            else   
                j++;
        }
    }
    string set1 = str1.substr(0, stringLength);

    pthread_mutex_lock(&cout_mutex);    cout << id << " : unique chars of first node were extracted" << endl;    pthread_mutex_unlock(&cout_mutex);

    stringLength = str2.length() - 1;
    for (int i = 0; i < stringLength; i++) {
        for (int j = i + 1; j < stringLength;) {
            if (str2[i] == str2[j]) 
                str2[j] = str2[--stringLength];
            else   
                j++;
        }
    }
    string set2 = str2.substr(0, stringLength);

    pthread_mutex_lock(&cout_mutex);    cout << id << " : unique chars of second node were extracted" << endl;    pthread_mutex_unlock(&cout_mutex);

    int intersection_index = 0;
    int union_index = 0;
    for (int i = 0 ; i < set1.length() ; i++){
        bool exists_in_set2 = false;
        for (int j = 0 ; j < set2.length() && exists_in_set2 == false; j++){
            if (set1[i] == set2[j]) {
                intersection_index ++;
                exists_in_set2 = true;
            }
        }
        if (!exists_in_set2) {
            union_index ++;
        }       
    }
    union_index += set2.length();

    pthread_mutex_lock(&cout_mutex);  cout << id << " : set1={" << set1 << "}, set2={" << set2 << "}" << endl;    pthread_mutex_unlock(&cout_mutex);
    pthread_mutex_lock(&cout_mutex);  cout << id << " : |n|=" << intersection_index << ", |u|=" << union_index <<  endl;    pthread_mutex_unlock(&cout_mutex);

    double similarity = ((double) intersection_index / union_index); 
    pthread_mutex_lock(&cout_mutex); cout<<id<<" : similarity is: " << similarity << endl;    pthread_mutex_unlock(&cout_mutex);

    pthread_mutex_lock(&indices_mutex); 
    my_graph[args->index1].jaccardList[args->index2] = similarity;
    my_graph[args->index2].jaccardList[args->index1] = similarity;
    pthread_mutex_unlock(&indices_mutex); 
    return NULL;

}
Другие вопросы по тегам