cufftSetStream вызывает вывод мусора. Я делаю что-то неправильно?

Согласно документам, cufftSetStream() функция

Связывает поток CUDA с планом cuFFT. Все запуски ядра, сделанные во время выполнения плана, теперь выполняются через связанный поток [... пока...] поток изменяется с другим вызовом cufftSetStream().

К сожалению, результаты превращаются в мусор. Вот пример, который демонстрирует это, выполняя кучу преобразований двумя способами: один раз, когда каждый поток имеет свой собственный выделенный план, и один раз с повторным использованием одного плана, как указано в документации выше. Первый ведет себя, как и ожидалось, в подходе reused/cufftSetStream есть ошибки в большинстве преобразований. Это наблюдалось на двух картах, которые я пробовал (GTX 750 ti, Titan X) на CentOS 7 linux с инструментами компиляции Cuda, выпуск 7.0, V7.0.27; и выпуск 7.5, V7.5.17.

РЕДАКТИРОВАТЬ: см. Комментарии "FIX" ниже для одного способа исправить вещи.

#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>

#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}


__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
            buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}

__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
            float e=buf1[i*stride+j] - buf2[i*stride+j];
            if (e*e > 1) // gross error
                atomicAdd(errors,1);
        }
    }
}

void demo(bool reuse_plan)
{
    if (reuse_plan)
        std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
    else
        std::cout << "Giving each stream its own dedicated fft plan ... ";
    int nfft = 1024;
    int batch = 1024;
    int nstreams = 8;
    int nbins = nfft/2+1;
    int nit=100;
    size_t inpitch,outpitch;

    std::vector<cufftComplex*> inbufs(nstreams);
    std::vector<float*> outbufs(nstreams);
    std::vector<float*> checkbufs(nstreams);
    std::vector<cudaStream_t> streams(nstreams);
    std::vector<cufftHandle> plans(nstreams);
    for (int i=0;i<nstreams;++i) {
        ck( cudaStreamCreate(&streams[i]));
        ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
        ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
        ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
        if (i==0 || reuse_plan==false)
            ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
    }

    // fill the input buffers and FFT them to get a baseline for comparison
    for (int i=0;i<nstreams;++i) {
        fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
        ck (cudaGetLastError());
        if (reuse_plan) {
            ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
        }else{
            ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
            ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
        }
        ck( cudaDeviceSynchronize());
    }
    // allocate a buffer for the error count
    int * errors;
    cudaMallocHost((void**)&errors,sizeof(int)*nit);
    memset(errors,0,sizeof(int)*nit);

    /* FIX: an event can protect the plan internal buffers 
    by serializing access to the plan
    cudaEvent_t ev;
    cudaEventCreateWithFlags(&ev,cudaEventDisableTiming);
    */

    // perform the FFTs and check the outputs on streams
    for (int it=0;it<nit;++it) {
        int k = it % nstreams;
        ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
        if (reuse_plan) {
            // FIX: ck(cudaStreamWaitEvent(streams[k],ev,0 ) );
            ck(cufftSetStream(plans[0],streams[k]));
            ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
            // FIX: ck(cudaEventRecord(ev,streams[k] ) );
        }else{
            ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
        }
        check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
        ck (cudaGetLastError());
    }
    ck(cudaDeviceSynchronize());

    // report number of errors
    int errcount=0;
    for (int it=0;it<nit;++it)
        if (errors[it])
            ++errcount;
    std::cout << errcount << " of " << nit << " transforms had errors\n";

    for (int i=0;i<nstreams;++i) {
        cudaFree(inbufs[i]);
        cudaFree(outbufs[i]);
        cudaStreamDestroy(streams[i]);
        if (i==0 || reuse_plan==false)
            cufftDestroy(plans[i]);
    }
}

int main(int argc,char ** argv)
{
    demo(false);
    demo(true);
    return 0;
}

Типичный вывод

Давая каждому потоку свой собственный план FFT... 0 из 100 преобразований имели ошибки
Повторное использование одного и того же плана FFT с несколькими потоками через cufftSetStream ... 87 из 100 преобразований имели ошибки

1 ответ

Решение

Для повторного использования планов вам нужно вручную управлять рабочей областью cuFFT.

В каждом плане есть место для промежуточных результатов расчета. Если вы хотите использовать дескриптор плана одновременно для двух или более разных исполнений плана, вам нужно предоставить временный буфер для каждого одновременного вызова cufftExec*.

Вы можете сделать это с помощью cufftSetWorkArea - пожалуйста, ознакомьтесь с разделом 3.7 документации cuFFT. Раздел 2.2 также поможет понять, как это работает.

Вот обработанный пример, показывающий изменения в вашем коде для этого:

$ cat t1241.cu
#include <cufft.h>
#include <stdexcept>
#include <iostream>
#include <numeric>
#include <vector>

#define ck(cmd) if ( cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);}


__global__
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y)
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x)
            buf[i*stride + j] = make_cuFloatComplex( (i+seed)%101 - 50,(j+seed)%41-20);
}

__global__
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors)
{
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) {
        for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) {
            float e=buf1[i*stride+j] - buf2[i*stride+j];
            if (e*e > 1) // gross error
                atomicAdd(errors,1);
        }
    }
}

void demo(bool reuse_plan)
{
    if (reuse_plan)
        std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... ";
    else
        std::cout << "Giving each stream its own dedicated fft plan ... ";
    int nfft = 1024;
    int batch = 1024;
    int nstreams = 8;
    int nbins = nfft/2+1;
    int nit=100;
    size_t inpitch,outpitch;

    std::vector<cufftComplex*> inbufs(nstreams);
    std::vector<float*> outbufs(nstreams);
    std::vector<float*> checkbufs(nstreams);
    std::vector<cudaStream_t> streams(nstreams);
    std::vector<cufftHandle> plans(nstreams);
    // if plan reuse, set up independent work areas
    std::vector<char *> wk_areas(nstreams);
    for (int i=0;i<nstreams;++i) {
        ck( cudaStreamCreate(&streams[i]));
        ck( cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch) );
        ck( cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch));
        ck( cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch) );
        if (i==0 || reuse_plan==false)
            ck ( cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch) );
    }
    if (reuse_plan){
      size_t ws;
      ck(cufftGetSize(plans[0], &ws));
      for (int i = 0; i < nstreams; i++)
        ck(cudaMalloc(&(wk_areas[i]), ws));
      ck(cufftSetAutoAllocation(plans[0], 0));
      ck(cufftSetWorkArea(plans[0], wk_areas[0]));
      }
    // fill the input buffers and FFT them to get a baseline for comparison
    for (int i=0;i<nstreams;++i) {
        fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i);
        ck (cudaGetLastError());
        if (reuse_plan) {
            ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i]));
        }else{
            ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i]));
            ck( cufftSetStream(plans[i],streams[i]) ); // only need to set the stream once
        }
        ck( cudaDeviceSynchronize());
    }
    // allocate a buffer for the error count
    int * errors;
    cudaMallocHost((void**)&errors,sizeof(int)*nit);
    memset(errors,0,sizeof(int)*nit);

    // perform the FFTs and check the outputs on streams
    for (int it=0;it<nit;++it) {
        int k = it % nstreams;
        ck( cudaStreamSynchronize(streams[k]) ); // make sure any prior kernels have completed
        if (reuse_plan) {
            ck(cufftSetStream(plans[0],streams[k]));
            ck(cufftSetWorkArea(plans[0], wk_areas[k])); // update work area pointer in plan
            ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k]));
        }else{
            ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k]));
        }
        check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]);
        ck (cudaGetLastError());
    }
    ck(cudaDeviceSynchronize());

    // report number of errors
    int errcount=0;
    for (int it=0;it<nit;++it)
        if (errors[it])
            ++errcount;
    std::cout << errcount << " of " << nit << " transforms had errors\n";

    for (int i=0;i<nstreams;++i) {
        cudaFree(inbufs[i]);
        cudaFree(outbufs[i]);
        cudaFree(wk_areas[i]);
        cudaStreamDestroy(streams[i]);
        if (i==0 || reuse_plan==false)
            cufftDestroy(plans[i]);
    }
}

int main(int argc,char ** argv)
{
    demo(false);
    demo(true);
    return 0;
}
$ nvcc -o t1241 t1241.cu -lcufft
$ ./t1241
Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors
Reusing the same fft plan with multiple stream via cufftSetStream ... 0 of 100 transforms had errors
$
Другие вопросы по тегам