QtGstreamer Appsink: зависает и медленно / неиспользуемые образцы
Моя цель - создать простой пользовательский приемник, способный получать данные из конвейера, которые затем следует использовать для различных приложений (запись, трансляция, внутренняя буферизация и т. д.).
В моей первой попытке идея состоит в том, чтобы повторно передать Http(s)/Udp/etc. Потоковая передача через Http снова, поэтому я использую souphttpsrc, очередь и QHttp для передачи данных одному или нескольким клиентам.
Кажется, что приложения работают, так как я могу запустить конвейер с моим собственным приемником (который по умолчанию просто игнорирует любой пример, пока не подключен хотя бы один клиент), поэтому я просто копирую образцы в ответе клиента.
Что действительно странно, так это то, что скорость загрузки действительно намного ниже (5-10 КБ / с вместо 2-300 КБ / с, как и ожидалось), чем скорость входящей потоковой передачи, и, кроме того, данные кажутся совершенно непригодными для использования. Я пытался вставить декодер в конвейер до того, как приемник и скорость достигли 33 Мбит / с, поэтому я исключил бы проблемы с производительностью, вызванные вызовом метода в очереди в главном потоке, отвечающем за отправку фактического образца через сеть; даже в этом случае данные кажутся просто мусором.
Во-вторых, если я добавлю тройник (или несколько очередей) для параллельного запуска autovideosink и / или autoaudiosink, конвейер застревает при запуске (это не происходит только с двумя autosink), и снова дефект такой же, если Я пытаюсь связать только два пользовательских приложения на футболку.
При отладке кода кажется, что newSample() никогда не вызывается, когда конвейер зависает, даже если последнее сообщение, полученное на шине, является изменением состояния воспроизведения пользовательского приемника.
Спасибо!
Вот код:
QGst:: FlowReturn MultiHttpSink:: newSample() {
QGst::SamplePtr sample = pullSample();
char data[sample->buffer()->size()];
qDebug() << "New Sample: size " << sample->buffer()->size();
qDebug() << sample->buffer()->duration().toTime();
qDebug() << sample->buffer()->presentationTimeStamp().toTime();
sample->buffer()->extract(0, &data, sample->buffer()->size());
for (auto it = resources.begin(); it != resources.end(); it++)
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, data));
return QGst::FlowOk;
}
void Server::setupServer() {
server = new QHttpServer(app);
server->listen(QHostAddress::Any, 8080, [&](QHttpRequest* req, QHttpResponse * res) {
res->setStatusCode(qhttp::ESTATUS_OK); // http status 200
res->addHeader("Content-Type", "video/mp2t");
m_sink.addAudience(res);
});
if (!server->isListening()) {
fprintf(stderr, "failed. can not listen at port 8080!\n");
throw std::exception();
}
void Server::setupPipeline() {
/* source pipeline */
QString pipe1Descr = QString(
"souphttpsrc location=\"%1\" ! "
"queue ! tee name=splitter "
"splitter.! decodebin ! autoaudiosink "
"splitter.! decodebin ! autovideosink "
"splitter.! appsink name=\"mysink\" "
).arg("URL");
pipeline1 = QGst::Parse::launch(pipe1Descr).dynamicCast<QGst::Pipeline>();
m_sink.setElement(pipeline1->getElementByName("mysink"));
QGlib::connect(pipeline1->bus(), "message", this, &Server::onBusMessage);
pipeline1->bus()->addSignalWatch();
/* start playing */
pipeline1->setState(QGst::StatePlaying);
}
QString stateString(QGst::State state) {
switch (state) {
case 0:
return "Void Pending";
case 1:
return "Null";
case 2:
return "Ready";
case 3:
return "Playing";
case 4:
return "Paused";
}
return "";
}
QString streamStateString(QGst::StreamStatusType type) {
switch (type) {
case 0:
return "Create";
case 1:
return "Enter";
case 2:
return "Leave";
case 3:
return "Destroy";
case 4:
return "Start";
case 5:
return "Pause";
case 6:
return "Stop";
}
}
void Server::onBusMessage(const QGst::MessagePtr& message) {
switch (message->type()) {
case QGst::MessageEos:
app->quit();
break;
case QGst::MessageError:
qCritical() << message.staticCast<QGst::ErrorMessage>()->error();
qCritical() << message.staticCast<QGst::ErrorMessage>()->debugMessage();
break;
case QGst::MessageStateChanged:
{
QGlib::RefPointer<QGst::StateChangedMessage> msg = message.staticCast<QGst::StateChangedMessage>();
qDebug() << msg->source()->name() <<": State changed from " << stateString(msg->oldState())
<< " -> " << stateString(msg->newState()) << " Transitioning to " << stateString(msg->pendingState());
if(msg->source()->name().compare("pipeline0")==0 ){
if( msg->newState() == QGst::StatePaused ){
need_reset = true;
playing = false;
} else {
need_reset = false;
if( msg->newState() == QGst::StatePlaying ){
playing = true;
}
}
}
break;
}
case QGst::MessageStreamStatus:
{
QGlib::RefPointer<QGst::StreamStatusMessage> msg = message.staticCast<QGst::StreamStatusMessage>();
qDebug() << "Stream Status: " << streamStateString(msg->statusType());
break;
}
default:
qDebug() << "Unhandles Bus Message Type: " << message->typeName();
break;
}
}
void Server::writeToHttpResource(QPointer<qhttp::server::QHttpResponse> res, QByteArray data) {
qDebug() << "Writing data...";
res->write(data);
}
ОБНОВИТЬ
Журнал отладки gstreamer сообщает что-то полезное...
0:00:00.733339246 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:00:00.733350762 [335m 8248[00m 0x274a8f0 [37mDEBUG [00m [00m tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f69d000fb80
0:00:00.733353629 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f69d0015120, maxsize:4103 offset:0 size:4096
0:00:00.733361430 [335m 8248[00m 0x274a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f69d000fb80, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 15582, offset_end none, flags 0x0
Размер входящих сетевых пакетов составляет 4096, но размер буфера в большинстве случаев составляет 1430... исследуя.... я думаю, что я должен получить доступ к буферу другим способом, чтобы получить доступ к необработанным данным.
Размер сетевых пакетов равен 1430, 4096, похоже, является буферной областью...
0:00:29.316301105 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes
Почему передача медленная и бессмысленная, пока остается загадкой. Почему он тоже зависает!
ОБНОВЛЕНИЕ 2
Я также пытался таким же образом, те же результаты (и сообщили о тех же размеров):
QGst::FlowReturn MultiHttpSink::newSample() {
QGst::SamplePtr sample = pullSample();
QGst::BufferPtr buffer = sample->buffer();
qDebug() << "New Sample: size " << buffer->size();
QGst::MapInfo map;
if (!buffer->map(map, QGst::MapRead))
return QGst::FlowError;
qDebug() << "New Buffer Map: size " << map.size();
for (auto it = resources.begin(); it != resources.end(); it++)
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data()));
buffer->unmap(map);
return QGst::FlowOk;
}
Вот вывод последней версии, такой же, как и раньше:
... ...
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
New Sample: size 4096
New Buffer Map: size 4096
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
... ...
ОБНОВЛЕНИЕ 3
Это журнал между каждой порцией данных:
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
0:04:28.221638796 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3566:gst_base_sink_chain_unlocked:<mysink2>[00m object unref after render 0x7f894400f850
0:04:28.221653137 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<mysink2:sink>[00m called chainfunction &gst_base_sink_chain with buffer 0x7f894400f850, returned ok
0:04:28.221667131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m tee gsttee.c:778:gst_tee_chain:<splitter>[00m handled buffer ok
0:04:28.221676164 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<splitter:sink>[00m called chainfunction &gst_tee_chain with buffer 0x7f894400f850, returned ok
0:04:28.221687738 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m queue_dataflow gstqueue.c:1482:gst_queue_loop:<queue0>[00m queue is empty
0:04:28.222861204 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:04:28.222896720 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944012fe0, maxsize:4103 offset:0 size:4096
0:04:28.222926663 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes
0:04:28.222956032 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2316:gst_base_src_do_sync:<souphttpsrc0>[00m no sync needed
0:04:28.222968263 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2520:gst_base_src_get_range:<souphttpsrc0>[00m buffer ok
0:04:28.222978663 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<queue0:sink>[00m calling chainfunction &gst_queue_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223004500 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<queue0:sink>[00m called chainfunction &gst_queue_chain with buffer 0x7f894400f630, returned ok
0:04:28.223013700 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m queue_dataflow gstqueue.c:1494:gst_queue_loop:<queue0>[00m queue is not empty
0:04:28.223031507 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2355:gst_base_src_update_length:<souphttpsrc0>[00m reading offset 20790622, length 4096, size -1, segment.stop -1, maxsize -1
0:04:28.223066379 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2456:gst_base_src_get_range:<souphttpsrc0>[00m calling create offset 20790622 length 4096, time 0
0:04:28.223056071 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<splitter:sink>[00m calling chainfunction &gst_tee_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223111831 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f894400f630
0:04:28.223121952 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:04:28.223139071 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223152343 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944015120, maxsize:4103 offset:0 size:4096
0:04:28.223179317 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3409:gst_base_sink_chain_unlocked:<mysink2>[00m got times start: 99:99:99.999999999, end: 99:99:99.999999999
0:04:28.223208511 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:1958:gst_base_sink_get_sync_times:<mysink2>[00m got times start: 99:99:99.999999999, stop: 99:99:99.999999999, do_sync 0
0:04:28.223222869 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944015120
0:04:28.223238437 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;04m default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
0:04:28.223281513 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;04m default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
0:04:28.223298205 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3520:gst_base_sink_chain_unlocked:<mysink2>[00m rendering object 0x7f894400f630
0:04:28.223312428 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:946:gst_base_sink_set_last_buffer_unlocked:<mysink2>[00m setting last buffer to 0x7f894400f630
0:04:28.223321131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944014080
0:04:28.223344396 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;34m GST_CAPS gstpad.c:2641:gst_pad_has_current_caps:<mysink2:sink>[00m check current pad caps (NULL)
0:04:28.223355939 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:760:gst_app_sink_render:<mysink2>[00m pushing render buffer 0x7f894400f630 on queue (0)
0:04:28.223369213 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:1286:gst_app_sink_pull_sample:<mysink2>[00m trying to grab a buffer
0:04:28.223377762 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:706:dequeue_buffer:<mysink2>[00m dequeued buffer 0x7f894400f630
0:04:28.223386131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:1301:gst_app_sink_pull_sample:<mysink2>[00m we have a buffer 0x7f894400f630
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
2 ответа
Я наконец нашел загадочную ошибку!
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data()));
Даже если этот код компилируется, он работает плохо, так как это:
Q_ARG(QByteArray, (char *)map.data())
Использование QByteArray(char *) в этом случае некорректно, поскольку мы инициализируем его не из строки, а из фрагмента необработанных данных. Записанный, как это было, фрагмент анализировался все время в поисках новой комбинации строк, что случайно отбирало фрагмент данных.
Чтобы избежать этого, используется конструктор QByteArray (char * ptr, int size).
Это правильная версия метода:
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<GssClientRequest>, *it), Q_ARG(QByteArray, QByteArray((char *) map.data(), map.size())));
Не было такой сложной ошибки, чтобы победить в конце;)
Спасибо за поддержку!
Добавьте очередь перед декодированием и после тройника, чтобы запустить контекст.
"souphttpsrc location = \"% 1 \ "!" "queue! tee name = splitter" "splitter.! queue! decodebin! autoaudiosink" "splitter.! очередь! decodebin! autovideosink " "разделитель.! appsink name=\"mysink\" "
последний сплиттер, связанный с appsink, не нуждается в очереди, он будет работать в контексте созданного исходного плагина gsttask.