C++ параллель в месте сортировки radix
Я пытаюсь реализовать параллельную сортировку по радиусу с помощью radix 256. Мне кажется, функция srt хорошо работает в одном потоке. Но при использовании большего количества потоков иногда для случайных данных появляется ошибка: "Место чтения нарушения доступа", за которым следует адрес "маркера [индекса]" для функции srt. Он разрывается в строке 15 в функции srt, т.е. "tmp = marker[index]", где index имеет значение 63. Может ли кто-нибудь объяснить, что происходит?
inline
void count(unsigned* list, int size, int* histogram) {
for (int i = 0; i < size; ++i) {
++histogram[(list[i]>>24)];
}
}
void srt(int* histogram, unsigned** marker) {
static const int bin_size = 256;
int left = histogram[0];
int index;
int tmp;
while (left-- > 0) {
index = *marker[0] >> 24;
while (index != 0) {
tmp = *marker[index];
*marker[index]++ = *marker[0];
*marker[0] = tmp;
index = *marker[0] >> 24;
}
++marker[0];
}
for (int k = 1; k < bin_size; ++k) {
left = histogram[k] - (marker[k] - marker[k - 1]);
while (left-- > 0) {
index = *marker[k] >> 24;
while (index != k) {
tmp = *marker[index];
*marker[index]++ = *marker[k];
*marker[k] = tmp;
index = *marker[k] >> 24;
}
++marker[k];
}
}
}
void parallel_sort(unsigned* list, int size) {
//Build histogram
static const int bin_size = 256;
int histogram[bin_size] = { 0 };
int histogram1[bin_size] = { 0 };
int histogram2[bin_size] = { 0 };
int histogram3[bin_size] = { 0 };
const int partial_size = size / 4;
count(list, partial_size, histogram);
count(&list[partial_size], partial_size, histogram1);
count(&list[2 * partial_size], partial_size, histogram2);
count(&list[3 * partial_size], partial_size + (size % 4), histogram3);
unsigned int* marker[bin_size];
unsigned int* marker1[bin_size];
unsigned int* marker2[bin_size];
unsigned int* marker3[bin_size];
unsigned int* previous = list;
for ( int i = 0; i < bin_size; ++i ) {
marker[i] = previous;
marker1[i] = marker[i] + histogram[i];
marker2[i] = marker1[i] + histogram1[i];
marker3[i] = marker2[i] + histogram2[i];
previous = marker3[i] + histogram3[i];
}
//Breaks in srt in any of those threads
thread t21(srt, histogram1, marker1);
thread t22(srt, histogram2, marker2);
thread t23(srt, histogram3, marker3);
srt(histogram, marker);
t21.join();
t22.join();
t23.join();
//TODO
}
int main() {
const int size = 100000;
unsigned list[size];
srand(time(NULL));
for (int i = 0;i < size;++i) {
list[i] = rand()*rand();
}
parallel_sort(list, size);
}
Я использую i3 Dell, Windows 10, Visual Studio 2015 со следующими параметрами
/MP /GS /analyze- /W3 /Zc:wchar_t /ZI /Gm /Od /sdl /Fd"Debug\vc140.pdb" /Zc:inline /fp:precise /D "_MBCS" /errorReport:prompt /WX- /Zc:forScope /RTC1 /Gd /Oy- /MDd /Fa"Debug\" /EHsc /nologo /Fo"Debug\" /Fp"
2 ответа
Попробуйте приведенный ниже код для параллельного радиуса:
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
enum errors {
INVALID_USAGE,
ERROR_OPENING_INPUT,
ERROR_OPENING_OUTPUT,
INVALID_FORMAT,
MEMORY_ERROR,
THREAD_CREATE_ERROR,
THREAD_JOIN_ERROR
};
const unsigned char KEY_LENGTH = 7;
const unsigned char CHARKEY_LENGTH = KEY_LENGTH + 1;
const unsigned short INPUT_SPACE = 256;
struct thread_info
{
unsigned char **input;
unsigned char **output;
unsigned short thread_idx;
unsigned int first_idx;
unsigned int last_idx;
pthread_barrier_t *barrier;
unsigned int *local_counters; // array of array of integer
unsigned int thread_count;
};
inline unsigned int coordinate(const unsigned int line)
{
return (line * CHARKEY_LENGTH);
}
// map each string of the memory array input to a pointer in output
inline void map_strings(unsigned char *const input, unsigned char
**output, const unsigned int size)
{
for(unsigned int i = 0; i < size; ++i){
output[i] = &(input[coordinate(i)]);
}
}
inline unsigned short core_count()
{
// Linux, Solaris, AIX, etc:
return (unsigned short)sysconf(_SC_NPROCESSORS_ONLN);
}
void print_usage(const char *const prog_name)
{
fprintf(stderr, "Usage: "
"%s input_file ouput_file\n", prog_name);
}
/*
counter the occurences of each char in input at radix i
first_idx and last_idx define the range of the input where the count
is needed
the result is stored in counter
*/
inline void count_char(const unsigned char *const *const input,
const unsigned int first_idx,
const unsigned int last_idx,
const unsigned int radix,
unsigned int *counter)
{
memset(counter, 0, INPUT_SPACE*sizeof(unsigned int));
for(unsigned int i = first_idx; i < last_idx; ++i){
const unsigned char c = input[i][radix];
++(counter[c]);
}
}
/* compute the offset of the current thread
local_counters: array of all coutners
thread_idx: current thread idx
thread_count: number of thread
offset: the array which is populated with the offset
*/
inline void compute_offset(const unsigned int *const local_counters,
const unsigned int thread_idx,
const unsigned int thread_count,
unsigned int *const offset){
unsigned int local_offset[INPUT_SPACE];
unsigned int global_counter[INPUT_SPACE];
for(unsigned int i = 0; i < INPUT_SPACE; ++i){
global_counter[i] = 0;
for(unsigned int thread_i = 0; thread_i < thread_count; ++thread_i)
{
if (thread_idx == thread_i){
local_offset[i] = global_counter[i];
}
const unsigned int value = local_counters[thread_i*INPUT_SPACE +
i];
global_counter[i] += value;
}
}
unsigned int previous_offset = 0;
for(unsigned int i = 1; i < INPUT_SPACE; ++i){
previous_offset += global_counter[i-1];
offset[i] = previous_offset + local_offset[i];
}
}
inline void sort_input(unsigned char *const *const input,
unsigned int *const offset,
const unsigned int first_idx,
const unsigned int last_idx,
const unsigned int radix,
unsigned char **const output){
for(unsigned int i = first_idx; i < last_idx; ++i){
const unsigned char c = input[i][radix];
const unsigned int current_offset = offset[c]++;
output[current_offset] = input[i];
}
}
void *concurrent_radix(void *arg)
{
thread_info *const info = (thread_info*)arg;
const unsigned short thread_idx = info->thread_idx;
const unsigned int first_idx = info->first_idx;
const unsigned int last_idx = info->last_idx;
const unsigned int thread_count = info->thread_count;
unsigned int *const local_counters = info->local_counters;
unsigned char **input = info->input;
unsigned char **output = info->output;
pthread_barrier_t *const barrier = info->barrier;
unsigned int *const counter = &(local_counters[thread_idx *
INPUT_SPACE]);
int radix = KEY_LENGTH - 1;
do {
count_char(input, first_idx, last_idx, radix, counter);
pthread_barrier_wait(barrier);
unsigned int offset[INPUT_SPACE];
compute_offset(local_counters, thread_idx, thread_count, offset);
sort_input(input, offset, first_idx, last_idx, radix, output);
pthread_barrier_wait(barrier);
unsigned char **const temp = input;
input = output;
output = temp;
--radix;
} while(radix >= 0);
return NULL;
}
// sort the radix index
inline void threaded_radix (unsigned char **input, unsigned char
**output, const unsigned int nb_keys)
{
const unsigned short nb_core = core_count();
pthread_t threads[nb_core];
thread_info threads_arg[nb_core];
unsigned int local_counters[nb_core * INPUT_SPACE];
pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, nb_core);
const unsigned int range = nb_keys / nb_core;
unsigned int last_idx = 0;
for (unsigned short i = 0; i < nb_core; ++i){
const unsigned int first_idx = last_idx;
last_idx = last_idx + range;
thread_info &info = threads_arg[i];
info.input = input;
info.output = output;
info.first_idx = first_idx;
info.last_idx = last_idx;
info.thread_idx = i;
info.thread_count = nb_core;
info.local_counters = local_counters;
info.barrier = &barrier;
}
threads_arg[nb_core-1].last_idx = nb_keys;
for (unsigned short i = 1; i < nb_core; ++i){
pthread_create(&threads[i], NULL, concurrent_radix, (void *)&
(threads_arg[i]));
}
concurrent_radix((void *)&(threads_arg[0]));
for (unsigned short i = 1; i < nb_core; ++i){
pthread_join(threads[i], NULL);
}
pthread_barrier_destroy(&barrier);
}
inline void radix_sort(unsigned char *input, unsigned char **output,
const unsigned int nb_keys)
{
unsigned char **buffer = (unsigned char **)malloc(nb_keys *
sizeof(unsigned char*));
map_strings(input, buffer, nb_keys);
threaded_radix(buffer, output, nb_keys);
free(buffer);
}
int main(const int argc, const char *const argv[])
{
if(argc < 3) {
print_usage(argv[0]);
return INVALID_USAGE;
}
// import the data in a table
FILE *input = fopen(argv[1], "r");
if(!input){
const int errsv = errno;
fprintf(stderr, "%s: %s\n", argv[1], strerror(errsv));
print_usage(argv[0]);
return ERROR_OPENING_INPUT;
}
unsigned int input_size;
{
int converted = fscanf(input, "%d\n", &input_size);
if(converted != 1){
fprintf(stderr, "Invalid file format");
return INVALID_FORMAT;
}
}
unsigned char *input_table = (unsigned char *)malloc(input_size *
CHARKEY_LENGTH * sizeof(unsigned char));
unsigned char **output_table = (unsigned char **)malloc(input_size *
sizeof(unsigned char*));
if(!input_table || !output_table){
fprintf(stderr, "Error: not enough memory\n");
return MEMORY_ERROR;
}
for(unsigned int i = 0; i<input_size; ++i){
unsigned char * key = &(input_table[coordinate(i)]);
size_t size_read = fread(key, 1, CHARKEY_LENGTH, input);
key[KEY_LENGTH] = '\0';
if(size_read != CHARKEY_LENGTH){
if(feof(input)){
assert(i == (input_size - 1));
} else {
fprintf(stderr, "Invalid file format");
return INVALID_FORMAT;
}
}
}
fclose(input);
// sort
struct timeval tick1, tick2;
gettimeofday(&tick1, NULL);
radix_sort(input_table, output_table, input_size);
gettimeofday(&tick2, NULL);
const double ellapsed = (tick2.tv_sec + tick2.tv_usec/1000000.0) -
(tick1.tv_sec + tick1.tv_usec/1000000.0);
printf("time for action = %g seconds\n", ellapsed);
// write the output
FILE *output = fopen(argv[2], "w+");
if(!input){
const int errsv = errno;
fprintf(stderr, "%s: %s\n", argv[2], strerror(errsv));
print_usage(argv[0]);
return ERROR_OPENING_OUTPUT;
}
fprintf(output, "%d\n", input_size);
for(unsigned int i = 0; i < input_size;++i){
fprintf(output, "%s\n", output_table[i]);
}
fclose(output);
free(input_table);
free(output_table);
return 0;
}
не забудьте добавить опцию -pthread для его компиляции.
Проблема здесь:
int main() {
const int size = 100000;
unsigned list[size];
srand(time(NULL));
for (int i = 0;i < size;++i) {
list[i] = rand()*rand();
}
parallel_sort(list, size);
}
Так что, если list[i] = rand()*rand();
приводит к числу, которое больше, чем size
какой 100 000? Ваши указатели marker
массивы воли абсолютно выходит из list
границ, что является причиной сбоя вашей программы.
Подводя итог, вы должны убедиться, что все значения для сортировки находятся под максимальным размером массива, так как вы используете основную сортировку.