cufftSetStream вызывает вывод мусора. Я делаю что-то неправильно?
Согласно документам, cufftSetStream()
функция
Связывает поток CUDA с планом cuFFT. Все запуски ядра, сделанные во время выполнения плана, теперь выполняются через связанный поток [... пока...] поток изменяется с другим вызовом cufftSetStream().
К сожалению, результаты превращаются в мусор. Вот пример, который демонстрирует это, выполняя кучу преобразований двумя способами: один раз, когда каждый поток имеет свой собственный выделенный план, и один раз с повторным использованием одного плана, как указано в документации выше. Первый ведет себя, как и ожидалось, в подходе reused/cufftSetStream есть ошибки в большинстве преобразований. Это наблюдалось на двух картах, которые я пробовал (GTX 750 ti, Titan X) на CentOS 7 linux с инструментами компиляции Cuda, выпуск 7.0, V7.0.27; и выпуск 7.5, V7.5.17.
РЕДАКТИРОВАТЬ: см. Комментарии "FIX" ниже для одного способа исправить вещи.
#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>
#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}
__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}
__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
float e=buf1[i*stride+j] - buf2[i*stride+j];
if (e*e > 1) // gross error
atomicAdd(errors,1);
}
}
}
void demo(bool reuse_plan)
{
if (reuse_plan)
std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
else
std::cout << "Giving each stream its own dedicated fft plan ... ";
int nfft = 1024;
int batch = 1024;
int nstreams = 8;
int nbins = nfft/2+1;
int nit=100;
size_t inpitch,outpitch;
std::vector<cufftComplex*> inbufs(nstreams);
std::vector<float*> outbufs(nstreams);
std::vector<float*> checkbufs(nstreams);
std::vector<cudaStream_t> streams(nstreams);
std::vector<cufftHandle> plans(nstreams);
for (int i=0;i<nstreams;++i) {
ck( cudaStreamCreate(&streams[i]));
ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
if (i==0 || reuse_plan==false)
ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
}
// fill the input buffers and FFT them to get a baseline for comparison
for (int i=0;i<nstreams;++i) {
fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
ck (cudaGetLastError());
if (reuse_plan) {
ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
}else{
ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
}
ck( cudaDeviceSynchronize());
}
// allocate a buffer for the error count
int * errors;
cudaMallocHost((void**)&errors,sizeof(int)*nit);
memset(errors,0,sizeof(int)*nit);
/* FIX: an event can protect the plan internal buffers
by serializing access to the plan
cudaEvent_t ev;
cudaEventCreateWithFlags(&ev,cudaEventDisableTiming);
*/
// perform the FFTs and check the outputs on streams
for (int it=0;it<nit;++it) {
int k = it % nstreams;
ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
if (reuse_plan) {
// FIX: ck(cudaStreamWaitEvent(streams[k],ev,0 ) );
ck(cufftSetStream(plans[0],streams[k]));
ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
// FIX: ck(cudaEventRecord(ev,streams[k] ) );
}else{
ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
}
check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
ck (cudaGetLastError());
}
ck(cudaDeviceSynchronize());
// report number of errors
int errcount=0;
for (int it=0;it<nit;++it)
if (errors[it])
++errcount;
std::cout << errcount << " of " << nit << " transforms had errors\n";
for (int i=0;i<nstreams;++i) {
cudaFree(inbufs[i]);
cudaFree(outbufs[i]);
cudaStreamDestroy(streams[i]);
if (i==0 || reuse_plan==false)
cufftDestroy(plans[i]);
}
}
int main(int argc,char ** argv)
{
demo(false);
demo(true);
return 0;
}
Типичный вывод
Давая каждому потоку свой собственный план FFT... 0 из 100 преобразований имели ошибки
Повторное использование одного и того же плана FFT с несколькими потоками через cufftSetStream ... 87 из 100 преобразований имели ошибки
1 ответ
Для повторного использования планов вам нужно вручную управлять рабочей областью cuFFT.
В каждом плане есть место для промежуточных результатов расчета. Если вы хотите использовать дескриптор плана одновременно для двух или более разных исполнений плана, вам нужно предоставить временный буфер для каждого одновременного вызова cufftExec*.
Вы можете сделать это с помощью cufftSetWorkArea - пожалуйста, ознакомьтесь с разделом 3.7 документации cuFFT. Раздел 2.2 также поможет понять, как это работает.
Вот обработанный пример, показывающий изменения в вашем коде для этого:
$ cat t1241.cu
#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>
#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}
__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}
__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
float e=buf1[i*stride+j] - buf2[i*stride+j];
if (e*e > 1) // gross error
atomicAdd(errors,1);
}
}
}
void demo(bool reuse_plan)
{
if (reuse_plan)
std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
else
std::cout << "Giving each stream its own dedicated fft plan ... ";
int nfft = 1024;
int batch = 1024;
int nstreams = 8;
int nbins = nfft/2+1;
int nit=100;
size_t inpitch,outpitch;
std::vector<cufftComplex*> inbufs(nstreams);
std::vector<float*> outbufs(nstreams);
std::vector<float*> checkbufs(nstreams);
std::vector<cudaStream_t> streams(nstreams);
std::vector<cufftHandle> plans(nstreams);
// if plan reuse, set up independent work areas
std::vector<char *> wk_areas(nstreams);
for (int i=0;i<nstreams;++i) {
ck( cudaStreamCreate(&streams[i]));
ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
if (i==0 || reuse_plan==false)
ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
}
if (reuse_plan){
size_t ws;
ck(cufftGetSize(plans[0], &ws));
for (int i = 0; i < nstreams; i++)
ck(cudaMalloc(&(wk_areas[i]), ws));
ck(cufftSetAutoAllocation(plans[0], 0));
ck(cufftSetWorkArea(plans[0], wk_areas[0]));
}
// fill the input buffers and FFT them to get a baseline for comparison
for (int i=0;i<nstreams;++i) {
fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
ck (cudaGetLastError());
if (reuse_plan) {
ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
}else{
ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
}
ck( cudaDeviceSynchronize());
}
// allocate a buffer for the error count
int * errors;
cudaMallocHost((void**)&errors,sizeof(int)*nit);
memset(errors,0,sizeof(int)*nit);
// perform the FFTs and check the outputs on streams
for (int it=0;it<nit;++it) {
int k = it % nstreams;
ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
if (reuse_plan) {
ck(cufftSetStream(plans[0],streams[k]));
ck(cufftSetWorkArea(plans[0], wk_areas[k])); // update work area pointer in plan
ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
}else{
ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
}
check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
ck (cudaGetLastError());
}
ck(cudaDeviceSynchronize());
// report number of errors
int errcount=0;
for (int it=0;it<nit;++it)
if (errors[it])
++errcount;
std::cout << errcount << " of " << nit << " transforms had errors\n";
for (int i=0;i<nstreams;++i) {
cudaFree(inbufs[i]);
cudaFree(outbufs[i]);
cudaFree(wk_areas[i]);
cudaStreamDestroy(streams[i]);
if (i==0 || reuse_plan==false)
cufftDestroy(plans[i]);
}
}
int main(int argc,char ** argv)
{
demo(false);
demo(true);
return 0;
}
$ nvcc -o t1241 t1241.cu -lcufft
$ ./t1241
Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors
Reusing the same fft plan with multiple stream via cufftSetStream ... 0 of 100 transforms had errors
$