Определение наименьшего элемента и его положения в каждом столбце матрицы с помощью CUDA Thrust
У меня довольно простая проблема, но я не могу найти элегантного решения.
У меня есть код Thrust, который выдает c
векторы одинакового размера, содержащие значения. Скажем, каждый из этих c
векторы имеют индекс. Я хотел бы для каждой позиции вектора, чтобы получить индекс c
вектор, для которого значение является наименьшим:
Пример:
C0 = (0,10,20,3,40)
C1 = (1,2 ,3 ,5,10)
Я бы получил в результате вектор, содержащий индекс C
вектор, который имеет наименьшее значение:
result = (0,1 ,1 ,0,1)
Я думал о том, чтобы сделать это с помощью итераторов Thrust Zip, но столкнулся с проблемами: я мог бы сжать все c
векторы и реализуют произвольное преобразование, которое принимает кортеж и возвращает индекс его наименьшего значения, но:
- Как перебрать содержимое кортежа?
- Как я понимаю, кортежи могут хранить только до
10
элементы и там может быть гораздо больше, чем10
c
векторы.
Затем я подумал сделать это так: вместо того, чтобы c
отдельные векторы, добавить их все в один вектор C
, затем сгенерируйте ключи, ссылающиеся на позиции, и выполните устойчивую сортировку по ключу, которая объединит векторные записи из одной позиции вместе. В примере, который дал бы:
C = (0,10,20,3,40,1,2,3,5,10)
keys = (0,1 ,2 ,3,4 ,0,1,2,3,4 )
after stable sort by key:
output = (0,1,10,2,20,3,3,5,40,10)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4 )
Затем сгенерируйте ключи с позициями в векторе, заархивируйте выходные данные с индексом c
векторов, а затем выполнить уменьшение по ключу с помощью специального функтора, который для каждого сокращения выводит индекс с наименьшим значением. В примере:
input = (0,1,10,2,20,3,3,5,40,10)
indexes= (0,1,0 ,1,0 ,1,0,1,0 ,1)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4)
after reduce by keys on zipped input and indexes:
output = (0,1,1,0,1)
Однако как написать такой функтор для операции сокращения по ключу?
3 ответа
Поскольку длина ваших векторов должна быть одинаковой. Лучше объединить их вместе и рассматривать их как матрицу C.
Тогда вашей задачей станет поиск индексов элемента min каждого столбца в матрице мажорной строки. Это можно решить следующим образом.
- сменить мажор на мажор;
- найти индексы для каждого столбца.
На шаге 1 вы предложили использовать stable_sort_by_key
изменить порядок элементов, который не является эффективным методом. Поскольку перестановка может быть вычислена напрямую с учетом #row и #col матрицы. В принципе, это можно сделать с помощью итераторов перестановок:
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
(_1 % row) * col + _1 / row)
)
На шаге 2 reduce_by_key
может делать именно то, что вы хотите. В вашем случае сокращающий бинарный опциональный функтор прост, так как сравнение по кортежу (элемент вашего zip-вектора) уже определено для сравнения 1-го элемента кортежа, и оно поддерживается командой thrust as
thrust::minimum< thrust::tuple<float, int> >()
Вся программа показана следующим образом. Требуется Thrust 1.6.0+, поскольку я использую заполнители в причудливых итераторах.
#include <iterator>
#include <algorithm>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/reduce.h>
#include <thrust/functional.h>
using namespace thrust::placeholders;
int main()
{
const int row = 2;
const int col = 5;
float initc[] =
{ 0, 10, 20, 3, 40, 1, 2, 3, 5, 10 };
thrust::device_vector<float> c(initc, initc + row * col);
thrust::device_vector<float> minval(col);
thrust::device_vector<int> minidx(col);
thrust::reduce_by_key(
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row) + row * col,
thrust::make_zip_iterator(
thrust::make_tuple(
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), (_1 % row) * col + _1 / row)),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), _1 % row))),
thrust::make_discard_iterator(),
thrust::make_zip_iterator(
thrust::make_tuple(
minval.begin(),
minidx.begin())),
thrust::equal_to<int>(),
thrust::minimum<thrust::tuple<float, int> >()
);
std::copy(minidx.begin(), minidx.end(), std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
return 0;
}
Две оставшиеся проблемы могут повлиять на производительность.
- минимальные значения должны быть выведены, что не требуется;
reduce_by_key
предназначен для сегментов с вариантами длины, он может быть не самым быстрым алгоритмом сокращения на сегментах с одинаковой длиной.
Написание собственного ядра может быть лучшим решением для достижения максимальной производительности.
Одна возможная идея, основанная на векторизованной идее сортировки здесь
Предположим, у меня есть такие векторы:
values: C = ( 0,10,20, 3,40, 1, 2, 3, 5,10) keys: K = ( 0, 1, 2, 3, 4, 0, 1, 2, 3, 4) segments: S = ( 0, 0, 0, 0, 0, 1, 1, 1, 1, 1)
молнии вместе K и S, чтобы создать KS
stable_sort_by_key с использованием C в качестве ключей и KS в качестве значений:
stable_sort_by_key(C.begin(), C.end(), KS_begin);
объединить переупорядоченные векторы C и K, чтобы создать CK
stable_sort_by_key, использующий переупорядоченный S в качестве ключей и CK в качестве значений:
stable_sort_by_key(S.begin(), S.end(), CK_begin);
использовать итератор перестановки или итератор с расширенным диапазоном для доступа к каждому N- му элементу (0, N, 2N, ...) вновь переупорядоченного вектора K, чтобы получить вектор индексов элемента min в каждом сегменте, где N - длина сегментов.
Я на самом деле не реализовал это, сейчас это просто идея. Может быть, это не будет работать по какой-то причине, которую я еще не наблюдал
segments
(S
) а также keys
(K
) эффективно являются индексами строк и столбцов.
И ваш вопрос кажется мне странным, потому что в вашем заголовке упоминается "найти индекс максимального значения", но большая часть вашего вопроса, похоже, относится к "минимальному значению". В любом случае, изменив шаг 6 моего алгоритма, вы можете найти любое значение.
У меня было любопытство, чтобы проверить, какой из предыдущих подходов был быстрее. Итак, я реализовал идею Роберта Кровеллы в приведенном ниже коде, который, для полноты картины, также описывает подход Эрика.
#include <iterator>
#include <algorithm>
#include <thrust/random.h>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/reduce.h>
#include <thrust/functional.h>
#include <thrust/sort.h>
#include "TimingGPU.cuh"
using namespace thrust::placeholders;
template <typename Iterator>
class strided_range
{
public:
typedef typename thrust::iterator_difference<Iterator>::type difference_type;
struct stride_functor : public thrust::unary_function<difference_type,difference_type>
{
difference_type stride;
stride_functor(difference_type stride)
: stride(stride) {}
__host__ __device__
difference_type operator()(const difference_type& i) const
{
return stride * i;
}
};
typedef typename thrust::counting_iterator<difference_type> CountingIterator;
typedef typename thrust::transform_iterator<stride_functor, CountingIterator> TransformIterator;
typedef typename thrust::permutation_iterator<Iterator,TransformIterator> PermutationIterator;
// type of the strided_range iterator
typedef PermutationIterator iterator;
// construct strided_range for the range [first,last)
strided_range(Iterator first, Iterator last, difference_type stride)
: first(first), last(last), stride(stride) {}
iterator begin(void) const
{
return PermutationIterator(first, TransformIterator(CountingIterator(0), stride_functor(stride)));
}
iterator end(void) const
{
return begin() + ((last - first) + (stride - 1)) / stride;
}
protected:
Iterator first;
Iterator last;
difference_type stride;
};
/**************************************************************/
/* CONVERT LINEAR INDEX TO ROW INDEX - NEEDED FOR APPROACH #1 */
/**************************************************************/
template< typename T >
struct mod_functor {
__host__ __device__ T operator()(T a, T b) { return a % b; }
};
/********/
/* MAIN */
/********/
int main()
{
/***********************/
/* SETTING THE PROBLEM */
/***********************/
const int Nrows = 200;
const int Ncols = 200;
// --- Random uniform integer distribution between 10 and 99
thrust::default_random_engine rng;
thrust::uniform_int_distribution<int> dist(10, 99);
// --- Matrix allocation and initialization
thrust::device_vector<float> d_matrix(Nrows * Ncols);
for (size_t i = 0; i < d_matrix.size(); i++) d_matrix[i] = (float)dist(rng);
TimingGPU timerGPU;
/******************/
/* APPROACH NR. 1 */
/******************/
timerGPU.StartCounter();
thrust::device_vector<float> d_min_values(Ncols);
thrust::device_vector<int> d_min_indices_1(Ncols);
thrust::reduce_by_key(
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / Nrows),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / Nrows) + Nrows * Ncols,
thrust::make_zip_iterator(
thrust::make_tuple(
thrust::make_permutation_iterator(
d_matrix.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), (_1 % Nrows) * Ncols + _1 / Nrows)),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), _1 % Nrows))),
thrust::make_discard_iterator(),
thrust::make_zip_iterator(
thrust::make_tuple(
d_min_values.begin(),
d_min_indices_1.begin())),
thrust::equal_to<int>(),
thrust::minimum<thrust::tuple<float, int> >()
);
printf("Timing for approach #1 = %f\n", timerGPU.GetCounter());
/******************/
/* APPROACH NR. 2 */
/******************/
timerGPU.StartCounter();
// --- Computing row indices vector
thrust::device_vector<int> d_row_indices(Nrows * Ncols);
thrust::transform(thrust::make_counting_iterator(0), thrust::make_counting_iterator(Nrows * Ncols), thrust::make_constant_iterator(Ncols), d_row_indices.begin(), thrust::divides<int>() );
// --- Computing column indices vector
thrust::device_vector<int> d_column_indices(Nrows * Ncols);
thrust::transform(thrust::make_counting_iterator(0), thrust::make_counting_iterator(Nrows * Ncols), thrust::make_constant_iterator(Ncols), d_column_indices.begin(), mod_functor<int>());
// --- int and float iterators
typedef thrust::device_vector<int>::iterator IntIterator;
typedef thrust::device_vector<float>::iterator FloatIterator;
// --- Relevant tuples of int and float iterators
typedef thrust::tuple<IntIterator, IntIterator> IteratorTuple1;
typedef thrust::tuple<FloatIterator, IntIterator> IteratorTuple2;
// --- zip_iterator of the relevant tuples
typedef thrust::zip_iterator<IteratorTuple1> ZipIterator1;
typedef thrust::zip_iterator<IteratorTuple2> ZipIterator2;
// --- zip_iterator creation
ZipIterator1 iter1(thrust::make_tuple(d_column_indices.begin(), d_row_indices.begin()));
thrust::stable_sort_by_key(d_matrix.begin(), d_matrix.end(), iter1);
ZipIterator2 iter2(thrust::make_tuple(d_matrix.begin(), d_row_indices.begin()));
thrust::stable_sort_by_key(d_column_indices.begin(), d_column_indices.end(), iter2);
typedef thrust::device_vector<int>::iterator Iterator;
// --- Strided access to the sorted array
strided_range<Iterator> d_min_indices_2(d_row_indices.begin(), d_row_indices.end(), Nrows);
printf("Timing for approach #2 = %f\n", timerGPU.GetCounter());
printf("\n\n");
std::copy(d_min_indices_2.begin(), d_min_indices_2.end(), std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
return 0;
}
Тестирование двух подходов для случая 2000x2000
размер матрицы, это был результат на карте Kepler K20c:
Eric's : 8.4s
Robert Crovella's : 33.4s