Параллельная_пайплайн не заканчивается
Я использую в своем коде функцию parallel_pipeline. Иногда, когда мое условие удовлетворяется, оно останавливает конвейер, а иногда - нет. Когда управление потоком вызывает остановку, даже после этого оно не завершается, а вызывает следующую часть и печатается на консоли, а затем вывод на консоль выглядит так, как будто он находится в бесконечном цикле и ничего не делает.
Код является:
#include <iostream>
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <tbb/pipeline.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
#include <tbb/tbbmalloc_proxy.h>
#include <tbb/tick_count.h>
using namespace std;
using namespace tbb;
#define pi 3.141593
#define FILTER_LEN 265
double coeffs[ FILTER_LEN ] =
{
0.0033473431384214393,0.000032074683390218124,0.0033131082058404943,0.0024777666109278788,
-0.0008968429179843104,-0.0031973449396977684,-0.003430943381749411,-0.0029796565504781646,
-0.002770673157048994,-0.0022783059845596586,-0.0008531818129514857,0.001115432556294998,
0.0026079871108133294,0.003012423848769931,0.002461420635709332,0.0014154004589753215,
0.00025190669718400967,-0.0007608257014963959,-0.0013703600874774068,-0.0014133823230551277,
-0.0009759556503342884,-0.00039687498737139273,-0.00007527524701314324,-0.00024181463305012626,
-0.0008521761947454302,-0.00162618205097997,-0.002170446498273018,-0.002129903305507943,
-0.001333859049002249,0.00010700092934983156,0.0018039564602637683,0.0032107930896349583,
0.0038325849735515363,0.003416201274366522,0.002060848732332109,0.00017954815260431595,
-0.0016358832300944531,-0.0028402136847527387,-0.0031256650498727384,-0.0025374271571154713,
-0.001438370315670195,-0.00035115295209013755,0.0002606730012030533,0.0001969569787142967,
-0.00039635535951198597,-0.0010886127490608972,-0.0013530057243606405,-0.0008123200399262436,
0.0005730271959526784,0.0024419465938120906,0.004133717273258681,0.0049402122577746265,
0.0043879285604252714,0.002449549610687005,-0.00040283102645093463,-0.003337730734820209,
-0.0054508346511294775,-0.006093057767824609,-0.005117609782189977,-0.0029293645861970417,
-0.0003251033117661085,0.0018074390555649442,0.0028351284091668164,0.002623563404428517,
0.0015692864792199496,0.0004127664681096788,-0.00009249878881824428,0.0004690173244168184,
0.001964334172374759,0.0037256715492873485,0.004809640399145206,0.004395274594482053,
0.0021650921193604,-0.0014888595443799124,-0.005534807968511709,-0.008642334104607624,
-0.009668950651149259,-0.008104732391434574,-0.004299972815463919,0.0006184612821881392,
0.005136551428636121,0.007907786753766152,0.008241212326068366,0.00634786595941524,
0.003235610213062744,0.00028882736660937287,-0.001320994685952108,-0.0011237433853145615,
0.00044213409507615003,0.0022057106517524255,0.00277593527678719,0.0011909915058737617,
-0.0025807757230413447,-0.007497632882437637,-0.011739520895818884,-0.013377018279057393,
-0.011166543231844196,-0.005133056165990026,0.0032948631959114935,0.011673660427968408,
0.017376415708412904,0.018548938130314566,0.014811760899506572,0.007450782505155853,
-0.001019540069785369,-0.007805775815783898,-0.010898333714715424,-0.00985364043415772,
-0.005988406030111452,-0.001818560524968024,0.000028552677472614846,-0.0019938756495376363,
-0.007477684025727061,-0.013989430449615033,-0.017870518868849213,-0.015639422062597726,
-0.005624959109456065,0.010993528170353541,0.03001263681283932,0.04527492462846608,
0.050581340787164114,0.041949186532860346,0.019360612460662185,-0.012644336735920483,
-0.0458782599058412,-0.07073838953156347,-0.0791205623455818,-0.06709535677423759,
-0.03644544574795176,0.005505370370858695,0.04780486657828151,0.07898800597378192,
0.0904453420042807,0.07898800597378192,0.04780486657828151,0.005505370370858695,
-0.03644544574795176,-0.06709535677423759,-0.0791205623455818,-0.07073838953156347,
-0.0458782599058412,-0.012644336735920483,0.019360612460662185,0.041949186532860346,
0.050581340787164114,0.04527492462846608,0.03001263681283932,0.010993528170353541,
-0.005624959109456065,-0.015639422062597726,-0.017870518868849213,-0.013989430449615033,
-0.007477684025727061,-0.0019938756495376363,0.000028552677472614846,-0.001818560524968024,
-0.005988406030111452,-0.00985364043415772,-0.010898333714715424,-0.007805775815783898,
-0.001019540069785369,0.007450782505155853,0.014811760899506572,0.018548938130314566,
0.017376415708412904,0.011673660427968408,0.0032948631959114935,-0.005133056165990026,
-0.011166543231844196,-0.013377018279057393,-0.011739520895818884,-0.007497632882437637,
-0.0025807757230413447,0.0011909915058737617,0.00277593527678719,0.0022057106517524255,
0.00044213409507615003,-0.0011237433853145615,-0.001320994685952108,0.00028882736660937287,
0.003235610213062744,0.00634786595941524,0.008241212326068366,0.007907786753766152,
0.005136551428636121,0.0006184612821881392,-0.004299972815463919,-0.008104732391434574,
-0.009668950651149259,-0.008642334104607624,-0.005534807968511709,-0.0014888595443799124,
0.0021650921193604,0.004395274594482053,0.004809640399145206,0.0037256715492873485,
0.001964334172374759,0.0004690173244168184,-0.00009249878881824428,0.0004127664681096788,
0.0015692864792199496,0.002623563404428517,0.0028351284091668164,0.0018074390555649442,
-0.0003251033117661085,-0.0029293645861970417,-0.005117609782189977,-0.006093057767824609,
-0.0054508346511294775,-0.003337730734820209,-0.00040283102645093463,0.002449549610687005,
0.0043879285604252714,0.0049402122577746265,0.004133717273258681,0.0024419465938120906,
0.0005730271959526784,-0.0008123200399262436,-0.0013530057243606405,-0.0010886127490608972,
-0.00039635535951198597,0.0001969569787142967,0.0002606730012030533,-0.00035115295209013755,
-0.001438370315670195,-0.0025374271571154713,-0.0031256650498727384,-0.0028402136847527387,
-0.0016358832300944531,0.00017954815260431595,0.002060848732332109,0.003416201274366522,
0.0038325849735515363,0.0032107930896349583,0.0018039564602637683,0.00010700092934983156,
-0.001333859049002249,-0.002129903305507943,-0.002170446498273018,-0.00162618205097997,
-0.0008521761947454302,-0.00024181463305012626,-0.00007527524701314324,-0.00039687498737139273,
-0.0009759556503342884,-0.0014133823230551277,-0.0013703600874774068,-0.0007608257014963959,
0.00025190669718400967,0.0014154004589753215,0.002461420635709332,0.003012423848769931,
0.0026079871108133294,0.001115432556294998,-0.0008531818129514857,-0.0022783059845596586,
-0.002770673157048994,-0.0029796565504781646,-0.003430943381749411,-0.0031973449396977684,
-0.0008968429179843104,0.0024777666109278788,0.0033131082058404943,0.000032074683390218124,
0.0033473431384214393
};
class MyBuffer
{
public:
double *acc;
double *buffer;
int start,end;
static int j;
MyBuffer()
{
start=0;
end=0;
buffer=new double[150264];
acc=new double[150000];
fill_n(buffer,150264,0);
}
~MyBuffer()
{
delete[] buffer;
delete[] acc;
}
int startnumber()
{
return start;
}
int endnumber()
{
return end;
}
};
typedef concurrent_bounded_queue<MyBuffer> QueueMyBufferType;
QueueMyBufferType chunk_queue;
atomic<bool> stop_flag;
atomic<bool> stop_filter;
int MyBuffer::j=0;
int queueloopcount=30;
void input_function()
{
stop_flag = false;
cout<<"thread reached to call input function " <<endl;
ofstream o("testing sinewave.csv");
int counter=0;
while(counter<150000)
{
// cout<<"value of counter is \t" <<counter << endl;
MyBuffer *b=new MyBuffer;
b->start=(FILTER_LEN-1+(counter));
b->end=(5264+(counter));
// cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl;
for(int i =b->startnumber(); i <b->endnumber(); i++)
{
b->buffer[i] = sin(700 * (2 * pi) * (i / 5000.0));
o<<b->buffer[i]<<endl;
}
chunk_queue.push(*b);
counter+=5000;
// cout<<"value of queueloopcount is "<< queueloopcount << endl;
}
cout<<"all data is perfectly generated" <<endl;
}
int main()
{
int ntokens = 8;
thread inputfunc(input_function);
tick_count t1,t2;
ofstream o("filter700Hz.csv");
t1=tick_count::now();
bool stop_pipeline = false;
stop_filter=false;
inputfunc.join();
parallel_pipeline(ntokens,make_filter<void,MyBuffer*>
(
filter::parallel,[&](flow_control& fc)->MyBuffer*
{
if(queueloopcount==0)
{
fc.stop();
cout<<"pipeline stopped"<<endl;
}
else
{
MyBuffer *b=new MyBuffer;
chunk_queue.pop(*b);
{
cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
queueloopcount--;
}
return b;
}
}
)&
make_filter<MyBuffer*, void>
(
filter::serial,[&](MyBuffer* b)
{
cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl;
}
)
);
cout<<"now i am out" <<endl;
o.close();
t2=tick_count::now();
cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;
return 0;
}
Пожалуйста, помогите найти неправильный код.
1 ответ
Проблема в этом коде связана с фильтром, т. Е. Параллельным на первом этапе, который вызывает проблему для объекта flow_control, который пытается всплыть, и это блокирующий вызов, из-за которого он блокируется, и решение этой проблемы заключается в том, что у вас, вероятно, должен быть последовательный первый фильтр, который только создайте пустой MyBuffer * и остановите конвейер, если больше нет работы. Затем установите параллельный второй фильтр, который выполняет реальную работу, и, наконец, последовательный (по порядку) выходной каскад.