Параллельная_пайплайн не заканчивается

Я использую в своем коде функцию parallel_pipeline. Иногда, когда мое условие удовлетворяется, оно останавливает конвейер, а иногда - нет. Когда управление потоком вызывает остановку, даже после этого оно не завершается, а вызывает следующую часть и печатается на консоли, а затем вывод на консоль выглядит так, как будто он находится в бесконечном цикле и ничего не делает.

Код является:

#include <iostream> 
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <tbb/pipeline.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
#include <tbb/tbbmalloc_proxy.h>    
#include <tbb/tick_count.h>
using namespace std;
using namespace tbb;

#define pi 3.141593
#define FILTER_LEN  265

double coeffs[ FILTER_LEN ] =
{
  0.0033473431384214393,0.000032074683390218124,0.0033131082058404943,0.0024777666109278788,
  -0.0008968429179843104,-0.0031973449396977684,-0.003430943381749411,-0.0029796565504781646,
  -0.002770673157048994,-0.0022783059845596586,-0.0008531818129514857,0.001115432556294998,
  0.0026079871108133294,0.003012423848769931,0.002461420635709332,0.0014154004589753215,
  0.00025190669718400967,-0.0007608257014963959,-0.0013703600874774068,-0.0014133823230551277,
  -0.0009759556503342884,-0.00039687498737139273,-0.00007527524701314324,-0.00024181463305012626,
  -0.0008521761947454302,-0.00162618205097997,-0.002170446498273018,-0.002129903305507943,
  -0.001333859049002249,0.00010700092934983156,0.0018039564602637683,0.0032107930896349583,
  0.0038325849735515363,0.003416201274366522,0.002060848732332109,0.00017954815260431595,
  -0.0016358832300944531,-0.0028402136847527387,-0.0031256650498727384,-0.0025374271571154713,
  -0.001438370315670195,-0.00035115295209013755,0.0002606730012030533,0.0001969569787142967,
  -0.00039635535951198597,-0.0010886127490608972,-0.0013530057243606405,-0.0008123200399262436,
  0.0005730271959526784,0.0024419465938120906,0.004133717273258681,0.0049402122577746265,
  0.0043879285604252714,0.002449549610687005,-0.00040283102645093463,-0.003337730734820209,
  -0.0054508346511294775,-0.006093057767824609,-0.005117609782189977,-0.0029293645861970417,
  -0.0003251033117661085,0.0018074390555649442,0.0028351284091668164,0.002623563404428517,
  0.0015692864792199496,0.0004127664681096788,-0.00009249878881824428,0.0004690173244168184,
  0.001964334172374759,0.0037256715492873485,0.004809640399145206,0.004395274594482053,
  0.0021650921193604,-0.0014888595443799124,-0.005534807968511709,-0.008642334104607624,
  -0.009668950651149259,-0.008104732391434574,-0.004299972815463919,0.0006184612821881392,
  0.005136551428636121,0.007907786753766152,0.008241212326068366,0.00634786595941524,
  0.003235610213062744,0.00028882736660937287,-0.001320994685952108,-0.0011237433853145615,
  0.00044213409507615003,0.0022057106517524255,0.00277593527678719,0.0011909915058737617,
  -0.0025807757230413447,-0.007497632882437637,-0.011739520895818884,-0.013377018279057393,
  -0.011166543231844196,-0.005133056165990026,0.0032948631959114935,0.011673660427968408,
  0.017376415708412904,0.018548938130314566,0.014811760899506572,0.007450782505155853,
  -0.001019540069785369,-0.007805775815783898,-0.010898333714715424,-0.00985364043415772,
  -0.005988406030111452,-0.001818560524968024,0.000028552677472614846,-0.0019938756495376363,
  -0.007477684025727061,-0.013989430449615033,-0.017870518868849213,-0.015639422062597726,
  -0.005624959109456065,0.010993528170353541,0.03001263681283932,0.04527492462846608,
  0.050581340787164114,0.041949186532860346,0.019360612460662185,-0.012644336735920483,
  -0.0458782599058412,-0.07073838953156347,-0.0791205623455818,-0.06709535677423759,
  -0.03644544574795176,0.005505370370858695,0.04780486657828151,0.07898800597378192,
  0.0904453420042807,0.07898800597378192,0.04780486657828151,0.005505370370858695,
  -0.03644544574795176,-0.06709535677423759,-0.0791205623455818,-0.07073838953156347,
  -0.0458782599058412,-0.012644336735920483,0.019360612460662185,0.041949186532860346,
  0.050581340787164114,0.04527492462846608,0.03001263681283932,0.010993528170353541,
  -0.005624959109456065,-0.015639422062597726,-0.017870518868849213,-0.013989430449615033,
  -0.007477684025727061,-0.0019938756495376363,0.000028552677472614846,-0.001818560524968024,
  -0.005988406030111452,-0.00985364043415772,-0.010898333714715424,-0.007805775815783898,
  -0.001019540069785369,0.007450782505155853,0.014811760899506572,0.018548938130314566,
  0.017376415708412904,0.011673660427968408,0.0032948631959114935,-0.005133056165990026,
  -0.011166543231844196,-0.013377018279057393,-0.011739520895818884,-0.007497632882437637,
  -0.0025807757230413447,0.0011909915058737617,0.00277593527678719,0.0022057106517524255,
  0.00044213409507615003,-0.0011237433853145615,-0.001320994685952108,0.00028882736660937287,
  0.003235610213062744,0.00634786595941524,0.008241212326068366,0.007907786753766152,
  0.005136551428636121,0.0006184612821881392,-0.004299972815463919,-0.008104732391434574,
  -0.009668950651149259,-0.008642334104607624,-0.005534807968511709,-0.0014888595443799124,
  0.0021650921193604,0.004395274594482053,0.004809640399145206,0.0037256715492873485,
  0.001964334172374759,0.0004690173244168184,-0.00009249878881824428,0.0004127664681096788,
  0.0015692864792199496,0.002623563404428517,0.0028351284091668164,0.0018074390555649442,
  -0.0003251033117661085,-0.0029293645861970417,-0.005117609782189977,-0.006093057767824609,
  -0.0054508346511294775,-0.003337730734820209,-0.00040283102645093463,0.002449549610687005,
  0.0043879285604252714,0.0049402122577746265,0.004133717273258681,0.0024419465938120906,
  0.0005730271959526784,-0.0008123200399262436,-0.0013530057243606405,-0.0010886127490608972,
  -0.00039635535951198597,0.0001969569787142967,0.0002606730012030533,-0.00035115295209013755,
  -0.001438370315670195,-0.0025374271571154713,-0.0031256650498727384,-0.0028402136847527387,
  -0.0016358832300944531,0.00017954815260431595,0.002060848732332109,0.003416201274366522,
  0.0038325849735515363,0.0032107930896349583,0.0018039564602637683,0.00010700092934983156,
  -0.001333859049002249,-0.002129903305507943,-0.002170446498273018,-0.00162618205097997,
  -0.0008521761947454302,-0.00024181463305012626,-0.00007527524701314324,-0.00039687498737139273,
  -0.0009759556503342884,-0.0014133823230551277,-0.0013703600874774068,-0.0007608257014963959,
  0.00025190669718400967,0.0014154004589753215,0.002461420635709332,0.003012423848769931,
  0.0026079871108133294,0.001115432556294998,-0.0008531818129514857,-0.0022783059845596586,
  -0.002770673157048994,-0.0029796565504781646,-0.003430943381749411,-0.0031973449396977684,
  -0.0008968429179843104,0.0024777666109278788,0.0033131082058404943,0.000032074683390218124,
  0.0033473431384214393
};

class MyBuffer 
{
    public:
    double *acc;
    double *buffer;
    int start,end;
    static int j;

    MyBuffer()
    {
        start=0;
        end=0;

       buffer=new double[150264];
       acc=new double[150000];
       fill_n(buffer,150264,0);

    }

    ~MyBuffer()
    {
        delete[] buffer;
        delete[] acc;
    }

    int startnumber()
    {
        return start;
    }

    int endnumber()
    {
        return end;
    }

};

typedef concurrent_bounded_queue<MyBuffer>  QueueMyBufferType;
QueueMyBufferType chunk_queue;

atomic<bool> stop_flag;
atomic<bool> stop_filter;

int MyBuffer::j=0;
int queueloopcount=30;

void input_function()                               
{   
   stop_flag = false;     

   cout<<"thread reached to call input function " <<endl;
   ofstream o("testing sinewave.csv");

   int counter=0;
   while(counter<150000)    
  { 
    //  cout<<"value of counter is \t" <<counter << endl;

        MyBuffer *b=new MyBuffer;                                                       
        b->start=(FILTER_LEN-1+(counter));
        b->end=(5264+(counter));

    //  cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl;

        for(int i =b->startnumber(); i <b->endnumber(); i++)
         {
               b->buffer[i] = sin(700 * (2 * pi) * (i / 5000.0));
               o<<b->buffer[i]<<endl;   
         }
         chunk_queue.push(*b);

         counter+=5000;
        // cout<<"value of queueloopcount is "<< queueloopcount << endl;
     }

     cout<<"all data is perfectly generated" <<endl;
}

int main()
{

    int ntokens = 8;

    thread inputfunc(input_function);
    tick_count t1,t2;
    ofstream o("filter700Hz.csv");
    t1=tick_count::now();

     bool stop_pipeline = false;    
     stop_filter=false;


     inputfunc.join();

     parallel_pipeline(ntokens,make_filter<void,MyBuffer*>
     ( 
             filter::parallel,[&](flow_control& fc)->MyBuffer*
         {
             if(queueloopcount==0)
             {
                 fc.stop();
                 cout<<"pipeline stopped"<<endl;
             }
             else
             {
                  MyBuffer *b=new MyBuffer;
                  chunk_queue.pop(*b);
                 {
                     cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
                     queueloopcount--;
                 }
                 return b;
             }
         }
     )&

    make_filter<MyBuffer*, void>
    (
        filter::serial,[&](MyBuffer* b)
        {
             cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl;
        }
     )
    );

        cout<<"now i am out" <<endl;
    o.close();
    t2=tick_count::now();

    cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;        

    return 0;
}

Пожалуйста, помогите найти неправильный код.

1 ответ

Решение

Проблема в этом коде связана с фильтром, т. Е. Параллельным на первом этапе, который вызывает проблему для объекта flow_control, который пытается всплыть, и это блокирующий вызов, из-за которого он блокируется, и решение этой проблемы заключается в том, что у вас, вероятно, должен быть последовательный первый фильтр, который только создайте пустой MyBuffer * и остановите конвейер, если больше нет работы. Затем установите параллельный второй фильтр, который выполняет реальную работу, и, наконец, последовательный (по порядку) выходной каскад.

Другие вопросы по тегам