使用 gst-rtsp-server Python 绑定处理错误

Handling errors with gst-rtsp-server Python bindings

我有一个简单的 Python 程序使用 gst-rtsp-server 创建 RTSP 流。它可以工作,但按原样没有错误处理。如果管道有错字或连接到视频源时出现问题,我看不到堆栈跟踪或任何日志记录。我应该在哪里挂接代码来处理这样的问题?

我应该提一下,我是 GObject 世界的初学者。我怀疑这些库有一种报告错误的标准方法,但我无法在我阅读的文档中找到有关如何完成的任何内容。

如果有帮助,这是我现在拥有的代码:

from threading import Thread
from time import sleep
import signal

import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtsp", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GLib, GObject, Gst, GstRtsp, GstRtspServer

PIPELINE = (
    "( videotestsrc ! vp8enc ! rtpvp8pay name=pay0 pt=96 )")


def main():
    GObject.threads_init()
    Gst.init(None)

    server = GstRtspServer.RTSPServer.new()
    server.props.service = "3000"

    server.attach(None)

    loop = GLib.MainLoop.new(None, False)

    def on_sigint(_sig, _frame):
        print("Got a SIGINT, closing...")
        loop.quit()
    signal.signal(signal.SIGINT, on_sigint)

    def run_main_loop():
        loop.run()

    main_loop_thread = Thread(target=run_main_loop)

    main_loop_thread.start()

    media_factory = GstRtspServer.RTSPMediaFactory.new()
    media_factory.set_launch(PIPELINE)
    media_factory.set_shared(True)
    server.get_mount_points().add_factory("/test", media_factory)
    print("Stream ready at rtsp://127.0.0.1:3000/test")


    while loop.is_running():
        sleep(0.1)


if __name__ == "__main__":
    main()

编辑:我建议使用 bkanuka 的解决方案(子类化 Gst.Bin)而不是下面的解决方案,后者有很多缺点。

经过对 GStreamer 和 RTSP 服务器库的更多实验,错误处理情况比较复杂。

正常方式

在 GStreamer 管道上查看错误的规范方法是向管道的总线添加观察器并侦听错误消息。

def watcher(bus, message, *user_data);
    if message.type == Gst.MessageType.ERROR:
        error, message = message.parse_error()
        # TODO: Do something with the error

my_pipeline.get_bus().add_watch(
    GLib.PRIORITY_DEFAULT,
    watcher,
    None)

但是,您不能使用提供给 GstRtspServer 的管道执行此操作。这是因为 GstRtspServer 期望能够在管道的总线上安装自己的观察器,并且一次只能将一个观察器连接到总线。这尤其不幸,因为这会阻止我们监听管道上的任何事件,而不仅仅是错误。

我的解决方法

我们可以将管道分成两部分:一个负责连接到源和解码帧的容易出错的过程,另一个负责编码结果帧并为 GstRtspServer 加载它们.然后我们就可以使用intervideo插件来实现两者之间的通信了。

例如,假设您正在尝试从 VP8 格式的文件进行流式传输。我们负责读取和解码帧的第一个管道如下所示:

filesrc location="{filepath}" ! decodebin ! intervideosink channel="file-channel"

...我们负责编码和加载帧的第二个管道如下所示:

intervideosrc channel="file-channel" ! videoconvert ! vp8enc deadline=1 ! rtpvp8pay name=pay0 pt=96

这里的关键是只有第二个管道必须由 GstRtspServer 管理,因为它是提供有效负载数据的管道。第一个由我们管理,我们可以将自己的观察者附加到它 它可以智能地响应错误并执行我们需要的任何其他操作。这当然不是一个完美的解决方案,因为我们无法响应与编码和有效负载相关的错误,但我们已经获得了接收与读取文件和解码相关的错误的能力。我们现在还可以执行其他与消息相关的任务,例如拦截流结束消息以循环播放视频文件。

因此您可以通过以下方式覆盖 Gst.Bin 中的 do_handle_message

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
Gst.init(None)

class SubclassBin(Gst.Bin):
    def do_handle_message(self, message):
        if message.type == Gst.MessageType.ERROR:
            error, message = message.parse_error()
            # TODO: Do something with the error
        # Call the base Gst.Bin do_handle_message
        super().do_handle_message(self, message)

subclass_bin = SubclassBin("mybin")

也就是说,我不确定如何告诉 GstRtspServer.RTSPMediaFactory 使用 SubclassBin 而不是 Gst.Bin 因为据我所知,这是连接任何 Bin 的唯一方法到 RTSPMediaFactory 是通过 set_launch 方法,该方法需要管道字符串而不是预构建的 bin。如果有一种方法可以向 RTSPMediaFactory 添加预构建的 bin,这将是一个完整的答案......但不幸的是,这是我所能得到的。