Gstreamer 定制管道未 playing/hanging 出来

Gstreamer custom-built pipeline not playing/hanging out

上下文

我正在尝试构建一个小型视频剪辑器来处理具有多个音轨的视频文件,例如,一个用于语音聊天,一个用于麦克风,一个用于游戏中的音频。最终目标是能够导出所有音轨混合在一起的剪辑,并最终能够改变每个音轨的音量。

我正在使用 Qt 构建 UI。

在构建 rendering/export 部分之前,我尝试使用 GStreamer 在我的应用程序中构建一个小型多轨音频视频查看器。

流水线

通过查看 GStreamer 教程,我构建了一个继承自 QObject 的 PlayerPipeline class(post 中的代码)。 class 将文件路径和 QWidget 作为参数,并从 UriDecodeBin 构建管道,以达到此形状(此处视频中有 2 个音轨):

|              | -> video  ->  | Queue | ---------------------------------------> | PlaySink |
| UriDecodeBin | -> audio1 ->  | Queue | -> | AudioConvert | -> | AudioMixer | -> |          |
|              | -> audio2 ->  | Queue | -> | AudioConvert | -> |            |

(我不确定这里是否需要队列,因为如果我没记错的话,UriDecodeBin 已经在为它的每个 src pad 创建队列)

通过以下步骤:

问题

有两件事发生了。

如果我在 uridecodebin 预卷后尝试 gst_element_get_state,没有超时限制,它有时会无限期挂起。

如果我不调用它,而是尝试将管道设置为播放状态,则什么也不会发生,而且我也不会在总线上收到状态更改消息。

我尝试添加队列(根据另一个 SO post),但我承认我现在有点不知所措。任何帮助,甚至是尝试解决 problem/get 见解的想法,都将不胜感激。

密码

请注意这里没有进行内存管理,因此代码可能充满漏洞,这很正常,我想在添加 cleanup/clean 错误管理代码之前拥有播放器的工作版本。

这个文件,gstmanips.h,定义了几个助手,实现起来可能很简单,所以我不在这里详细介绍它们。

bool isPadSrc(GstPad* pad, std::string name);

void printPipelineState(GstPipeline* pipeline);

void printElementState(GstElement* elem);

void flushAndPrintBus(GstBus* bus, GstPipeline* pipe);

只显示 PrintPipelineStateflushAndPrintBus 的实现:


void printPipelineState(GstPipeline* pipeline){
    GstStateChangeReturn ret;
    GstState state;
    GstState pending;

    ret = gst_element_get_state(GST_ELEMENT(pipeline), &state, &pending, GST_CLOCK_TIME_NONE);
    if(ret == GST_STATE_CHANGE_FAILURE){
        std::cout << "failed querying state" << std::endl;
        return;
    }
    if(ret == GST_STATE_CHANGE_SUCCESS){
        std::cout << "successfully queried the state of pipeline" << std::endl;
        std::string str = "pipeline ";
        str += std::string(gst_element_state_get_name(state));
        std::cout << str << std::endl;
    }
}

void flushAndPrintBus(GstBus* bus, GstPipeline* pipe){
    if(!bus){
        return;
    }
    GstMessage* gm = gst_bus_pop(bus);
    while(gm){
        GError *err;
        gchar *debug_info;

        switch (GST_MESSAGE_TYPE (gm)) {
            case GST_MESSAGE_ERROR:
                gst_message_parse_error (gm, &err, &debug_info);
                 std::cout << "## BUS ## Error received from element " << GST_OBJECT_NAME (gm->src) <<  " : " <<  err->message << std::endl;;
                 std::cout << "## BUS ## Debugging information: " << (debug_info ? debug_info : "none")<< std::endl;
                 g_clear_error (&err);
                 g_free (debug_info);
                 break;
            case GST_MESSAGE_EOS:
                 std::cout << "## BUS ## End-Of-Stream reached." << std::endl;
                 break;
            case GST_MESSAGE_STATE_CHANGED:
            /* We are only interested in state-changed messages from the pipeline */
                if (GST_MESSAGE_SRC (gm) == GST_OBJECT (pipe)) {
                    GstState old_state, new_state, pending_state;
                    gst_message_parse_state_changed (gm, &old_state, &new_state, &pending_state);
                    std::cout << "## BUS ## Pipeline state changed from " <<
                        gst_element_state_get_name (old_state) << " to " << gst_element_state_get_name (new_state) << std::endl;
                }
                break;
            case GST_MESSAGE_STREAM_STATUS:
                GstStreamStatusType type;
                GstElement *owner;
                gst_message_parse_stream_status(gm, &type, &owner);
                std::cout << "Stream status : " << type << std::endl;
                break;
            default:
                /* We should not reach here */
                std::cout <<  "## BUS ## Unexpected message received: " << std::to_string(GST_MESSAGE_TYPE(gm)) << std::endl;

                break;
              }
        gm = gst_bus_pop(bus);
    }
}

现在我们在 playerpipeline.hplayerpipeline.cpp 中实现了管道:

#ifndef PLAYERPIPELINE_H
#define PLAYERPIPELINE_H

#include <QObject>
#include <glib.h>
#include <gst/gst.h>
#include <gst/video/videooverlay.h>
#include "gstmanips.h"
#include <vector>
#include <QTimer>
#include <QWidget>


class PlayerPipeline : public QObject
{
    Q_OBJECT
public:
    explicit PlayerPipeline(QString file, QWidget* display, QObject *parent = nullptr);
    void triggerNoMorePadSignal();
    void increasePreroll();
    void decreasePreroll();


signals:
    void noMorePadSignal();
    void decoderPrerolledSignal();
    void decoderBuiltSignal();
    void pipelineBuiltSignal();


public slots:
    void onNoMorePad();
    void onBusRefresh();
    void onDecoderPrerolled();
    void onPipelineBuilt();
    void play();
    void pause();

private:
    void changePipelineState(GstState state);
    void buildAndLinkConverters();
    void buildAndLinkMixer();
    void buildAndLinkPlaySink();

public:
    gint prerollCounter;
    bool prerolled;

private:
    //misc
    QString file;

    //pipeline
    GstBus* bus;
    GstPipeline* pipeline;
    QTimer* timer;
    unsigned timerStop;

    //decoder
    GstElement* decoder;
    std::vector<GstPad*> decoderAudioSrcs;
    GstPad* decoderVideoSrc;


    //audio convert (TODO add volume after)
    std::vector<GstElement*> audioConverters;
    std::vector<GstPad*> audioConvertersSrc;
    std::vector<GstPad*> audioConvertersSink;
    std::vector<GstElement*> audioQueues;

    //mixer
    GstElement* audioMixer;
    std::vector<GstPad*> audioMixerSinks;
    GstPad* audioMixerSrc;

    //sink
    GstElement* videoQueue;
    GstElement* playSink;
    GstPad* playSinkVideoSink;
    GstPad* playSinkAudioSink;
    GstElement* xvideo;
    QWidget* display;
};

#endif // PLAYERPIPELINE_H

#include "playerpipeline.h"
#include <assert.h>
#include <iostream>

void pp_no_more_pads_cb(GstElement* self, gpointer udata){
    (void)self;
    PlayerPipeline* db = (PlayerPipeline*)udata;
    std::cout << "no more pad :" <<std::endl;
    db->triggerNoMorePadSignal();
}

PlayerPipeline::PlayerPipeline(QString file, QWidget* display, QObject *parent)
    : QObject{parent}, prerollCounter(0), prerolled(false), file(file), display(display)
{
    pipeline = (GstPipeline*)gst_pipeline_new("PlayerPipeline");
    assert(pipeline);
    bus = gst_pipeline_get_bus(pipeline);
    assert(bus);
    decoder = gst_element_factory_make("uridecodebin", "decoder");
    assert(decoder);
    assert(gst_bin_add(GST_BIN(pipeline), decoder));
    assert(gst_element_sync_state_with_parent(decoder));
    std::string uri = std::string("file://")+file.toStdString();
    g_object_set(decoder, "uri", uri.c_str(), nullptr);

    QObject::connect(this, &PlayerPipeline::noMorePadSignal, this, &PlayerPipeline::onNoMorePad, Qt::QueuedConnection);
    g_signal_connect(decoder, "no-more-pads", G_CALLBACK(pp_no_more_pads_cb), this);
    QObject::connect(this, &PlayerPipeline::decoderPrerolledSignal, this, &PlayerPipeline::onDecoderPrerolled, Qt::QueuedConnection);
    QObject::connect(this, &PlayerPipeline::pipelineBuiltSignal, this, &PlayerPipeline::onPipelineBuilt);
    changePipelineState(GST_STATE_PLAYING);

    timer = new QTimer(this);
    QObject::connect(timer, &QTimer::timeout, this, &PlayerPipeline::onBusRefresh);
    timer->start(1);
    timerStop = 0;
}


void PlayerPipeline::triggerNoMorePadSignal()
{
    emit noMorePadSignal();
}


void PlayerPipeline::increasePreroll()
{
    g_atomic_int_inc(&prerollCounter);
}

void PlayerPipeline::decreasePreroll()
{
    if(g_atomic_int_dec_and_test(&prerollCounter)){
        this->prerolled = true;
        emit decoderPrerolledSignal();
    }
}

// FIXME there exists probably a better way to implem this
bool pp_caps_is_audio(GstCaps* caps){
    std::string description(gst_caps_to_string(caps));
    return description.rfind("audio/", 0) == 0;
}

// FIXME there exists probably a better way to implem this
bool pp_caps_is_video(GstCaps* caps){
    std::string description(gst_caps_to_string(caps));
    return description.rfind("video/", 0) == 0;
}

// FIXME maybe we need to use it to trigger when the pipeline is actually prerolled ?
GstPadProbeReturn blocked_cb(GstPad* pad, GstPadProbeInfo* info, gpointer udata){
    (void)pad;
    (void)info;
    std::cout << "blocked cb"<< std::endl;
    PlayerPipeline* pp = (PlayerPipeline*)udata;
    if(pp->prerolled){
        std::cout << "removing blocking probe" << std::endl;
        return GST_PAD_PROBE_REMOVE;
    } else {
    std::cout << "blocking probe installed !" << std::endl;
    pp->decreasePreroll();
    std::cout << "preroll decreased, exiting blocking_cb" << std::endl;
    return GST_PAD_PROBE_OK;
    }
}

void PlayerPipeline::onNoMorePad(){
    std::cout << "### LISTING PADS ###" << std::endl;
    changePipelineState(GST_STATE_PAUSED);
    GstIterator *it = gst_element_iterate_src_pads(decoder);
    GValue padV = G_VALUE_INIT;
    while(gst_iterator_next(it, &padV) == GST_ITERATOR_OK){
        std::cout << "PAD : " << std::endl;
        GstPad* pad = (GstPad*)g_value_get_object(&padV);
        assert(pad);
        GstCaps* caps = gst_pad_get_current_caps(pad);
        assert(caps);
        std::string description(gst_caps_to_string(caps));
        std::cout << description << std::endl;
        //FIXME some code duplication here, could likely be factorized ?
        if(pp_caps_is_audio(caps)){
            decoderAudioSrcs.push_back(pad);
        } else if (pp_caps_is_video(caps)){

            decoderVideoSrc = pad;
        } else {
            std::cout << "UNKNOWN PAD FOUND" << std::endl;
        }
        //probing the pad.
        this->increasePreroll();
        gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, (gpointer)this, NULL);
    }
    std::cout << "waiting for pipeline to be prerolled" << std::endl;
}



void PlayerPipeline::onBusRefresh()
{
    //std::cout << "### timer timeouted ###" << std::endl;
    flushAndPrintBus(bus, pipeline);
    //timerStop += 1;
    //if (timerStop > 130){
    //    timer->stop();
    //}
}


void PlayerPipeline::onDecoderPrerolled()
{
    //here we can build the rest of the pipeline
    std::cout << "printing pipeline state after prerolling" << std::endl;
    printPipelineState(pipeline);
    buildAndLinkConverters();
    buildAndLinkMixer();
    buildAndLinkPlaySink();
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-prerolled");
    emit pipelineBuiltSignal();
}

void PlayerPipeline::buildAndLinkConverters(){
    std::cout << "BUILDING AND LINKING CONVERTERS" << std::endl;
    unsigned pRank = 0;
    for(auto dpad : decoderAudioSrcs){
        std::cout << "building audio queue" << std::endl;
        GstElement* queue = gst_element_factory_make("queue", (std::string("audioqueue_") + std::to_string(pRank)).c_str());
        audioQueues.push_back(queue);
        assert(queue);
        assert(gst_bin_add(GST_BIN(pipeline), queue));
        assert(gst_element_sync_state_with_parent(queue));
        GstPad* qsink = gst_element_get_static_pad(queue, "sink");
        GstPad* qsrc = gst_element_get_static_pad(queue, "src");
        assert(gst_pad_link(dpad, qsink) == GST_PAD_LINK_OK);
        std::cout << "building audio converter" << std::endl;
        GstElement* convert = gst_element_factory_make("audioconvert", (std::string("converter_") + std::to_string(pRank)).c_str());
        assert(convert);
        assert(gst_bin_add(GST_BIN(pipeline), convert));
        assert(gst_element_sync_state_with_parent(convert));
        GstPad* convsink = gst_element_get_static_pad(convert, "sink");
        assert(convsink);
        assert(gst_pad_link(qsrc, convsink) == GST_PAD_LINK_OK);
        audioConverters.push_back(convert);
        audioConvertersSink.push_back(convsink);
        //GstPad* convsrc = gst_element_get_static_pad(convert, "src");
        //assert(convsrc);
        //audioConvertersSrc.push_back(convsrc);
        pRank++;
    }
    std::cout << "printing pipeline state after converters" << std::endl;
    printPipelineState(pipeline);
   std::cout << "CONVERTERS : DONE" << std::endl;
}

void PlayerPipeline::buildAndLinkMixer(){
    std::cout << "BUILDING AND LINKING MIXER" << std::endl;
    audioMixer = gst_element_factory_make("audiomixer", "audioMixer");
    assert(audioMixer);
    assert(gst_bin_add(GST_BIN(pipeline), audioMixer));
    assert(gst_element_sync_state_with_parent(audioMixer));
    for(auto converter : audioConverters){
        assert(gst_element_link(converter, audioMixer));
    }
    std::cout << "printing pipeline state after adding mixer" << std::endl;
    printPipelineState(pipeline);
    std::cout << "MIXER : DONE" << std::endl;
}


GstPad* pp_requestPad(GstElement* elem, const char* tname, const char* name){
    GstPadTemplate* ptempl = gst_element_get_pad_template(elem, tname);
    if(ptempl == nullptr){
        std::cout << "failed to retreive pad template " << std::string(tname) << std::endl;
    }
    GstPad* pad = gst_element_request_pad(elem, ptempl, name, nullptr);
    if(pad == nullptr){
        std::cout << "pad request for " << std::string(name) << " returned nullptr" << std::endl;
    }
    return pad;
}

void PlayerPipeline::buildAndLinkPlaySink(){
    std::cout << "BUILDING AND LINKING PLAYSINK" << std::endl;
    playSink = gst_element_factory_make("playsink", "playSink");
    assert(playSink);
    WId xwinid = display->winId();
    gst_video_overlay_set_window_handle(GST_VIDEO_OVERLAY(playSink), xwinid);
    assert(gst_bin_add(GST_BIN(pipeline), playSink));
    assert(gst_element_sync_state_with_parent(playSink));
    assert(gst_element_link(audioMixer, playSink));
    playSinkVideoSink = pp_requestPad(playSink, "video_raw_sink", "video_raw_sink");
    assert(decoderVideoSrc);
    assert(playSinkVideoSink);


    std::cout << "building video queue" << std::endl;
    GstElement* queue = gst_element_factory_make("queue", "videoqueue");
    videoQueue = queue;
    assert(queue);
    assert(gst_bin_add(GST_BIN(pipeline), queue));
    assert(gst_element_sync_state_with_parent(queue));
    GstPad* qsink = gst_element_get_static_pad(queue, "sink");
    assert(qsink);
    GstPad* qsrc = gst_element_get_static_pad(queue, "src");
    assert(qsrc);
    assert(gst_pad_link(decoderVideoSrc, qsink) == GST_PAD_LINK_OK);
    assert(gst_pad_link(qsrc, playSinkVideoSink) == GST_PAD_LINK_OK);

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-psink");
    std::cout << "PLAYSINK : DONE" << std::endl;
}

void PlayerPipeline::onPipelineBuilt(){
    //TODO implement here
    //here we can remove blocking probes. (by playing ?)
    std::cout << "UNPREROLLING THE PIPELINE" << std::endl;
    //printPipelineState(pipeline);
   // std::cout << "printing pipeline state before seeking" << std::endl;
   // printPipelineState(pipeline);
    assert(gst_element_seek(GST_ELEMENT(pipeline), 1.0,
                     GST_FORMAT_TIME,
                     GstSeekFlags(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE),
                     GST_SEEK_TYPE_SET, 0,
                     GST_SEEK_TYPE_NONE, 0));
    // play();
    std::cout << "DONE" << std::endl;
}

void PlayerPipeline::play()
{
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-play");

}

void PlayerPipeline::pause()
{
    std::cout << "paused ..." << std::endl;
    changePipelineState(GST_STATE_PAUSED);
}


void PlayerPipeline::changePipelineState(GstState state)
{
    GstStateChangeReturn ret = gst_element_set_state(GST_ELEMENT(pipeline), state);
    if(ret == GST_STATE_CHANGE_FAILURE){
        std::cout << "could not set pipeline to state : " << gst_element_state_get_name (state) << std::endl;
        exit(124);
    }
}

方法和函数应按以下顺序执行:

每 1 毫秒在计时器上并行(不完全并行,Qt 事件循环中的事件),调用 onBusRefresh。它使用以下实现:

如果此时程序没有卡在 printPipelineState 调用中,而我正在调用 play()pause() 方法,则什么也不会发生(除了标准输出方法)。另外我查了一下,此时pad还是卡住了。

编辑:调用 play 方法时的管道如下所示(我建议在新选项卡中打开以进行缩放):

最后我找到了解决方案,即手动删除探针,而不是依赖刷新搜索和回调黑魔法。

所以,添加探针 returns a gulong :

gulong
gst_pad_add_probe (GstPad * pad,
                   GstPadProbeType mask,
                   GstPadProbeCallback callback,
                   gpointer user_data,
                   GDestroyNotify destroy_data)

所以我将它存储在焊盘旁边的向量中,然后使用 :

手动删除 onPipelineBuilt 中的探针
gst_pad_remove_probe (GstPad * pad,
                      gulong id)

我不知道为什么回调+刷新没有工作,尽管在本教程中显示: https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html?gi-language=c 我可能做错了什么。

编辑:我很快就回答了,如果您需要更多详细信息或工作管道的完整代码,请随时询问(通过评论)。