Gstreamer,如何从(rtmpsink)错误中恢复

Gstreamer, how recover from (rtmpsink) error

我正在 python 中使用 gstreamer 构建流媒体应用程序。

应用程序使用 tee 元素将数据写入 rtmpsink 和 filesink。启动和流媒体在理想环境(本地网络)中工作正常,但是如果与流媒体服务器断开连接怎么办?我正在尝试弄清楚如何保持管道 运行 并因此在发生错误后继续写入 filesink...

我要归档的内容:

  1. 至少我想在流媒体部分 (rtmpsink) 发生错误后保留我的存档文件 (filesink)。因此,如果发生错误,我们有一些备份。
  2. 手动重新连接到流媒体服务器。
  3. 建立一些机制来检查连接并在可能的情况下重新连接流式传输部分 (rtmpsink)。

问题:

是否可以存档我正在尝试做的事情?

如何归档(动态管道/探测/额外元素)?

非常感谢任何解释、示例或指向正确方向的信息。

注:

Gst 版本: gstreamer 1.3.90 (rtmpsink, faac, x264enc)
OS: ubuntu 14.04 LTS
流媒体服务器: wowza 4.x

测试应用程序(代码): link
启动后的流水线(OK): link

pipeline after rtmpsink error(Failed to write data): link
rtmpsink 错误后的日志片段(写入数据失败): link

我不确定使用单个管道获得的系统有多可靠。我建议做的是创建一个两阶段流程:

1) audio -> encode -> tee -> filesink
                          -> shmsink 

2) shmsrc -> mux -> rtmpsink

然后为第二个管道创建包装器脚本。下面是如何将这些元素与 videotestsrc 一起使用的示例。请注意,上限非常重要——它们必须足够详细才能知道共享内存中的内容。

gst-launch-1.0 videotestsrc ! tee name=t ! queue ! videoconvert ! ximagesink t. ! video/x-raw,width=400,height=400,format=BGRA ! shmsink wait-for-connection=false socket-path=/tmp/shr

gst-launch-1.0 shmsrc socket-path=/tmp/shr ! video/x-raw,width=400,height=400,format=BGRA,framerate=30/1 ! videoconvert ! ximagesink

您也可以使用 TCP/UDP 而不是共享内存来尝试这种方法。我没有安装 faac 插件,但管道可能是这样的:

audio -> faac -> rtpmp4apay -> udpsink host=localhost port=1919

udpsrc port=1919 -> rtpmp4adepay -> mux -> rtmpsink

我也一直在尝试让管道在出错后重新连接到 RTMP 服务器。原则上我同意 (使用与 shmsink/shmsrc 对连接的两个管道)但我无法让它可靠地工作,所以我最终使用了不同的策略。

我正在使用 rtmp2sink,当它遇到错误时,它会 post 管道总线上的消息,然后 return GST_FLOW_FLUSHING 导致管道冲洗一切。这不是我感兴趣的,所以我在 rtmp2sink 前面添加了一个 GhostPad,它捕获 return 值并将其转回 GST_FLOW_OK。那时我还重置了 rtmp2sink 元素以使其重新连接。

这看起来相当可靠,至少在我使用的 RTMP 服务器上,我不需要做任何特殊的事情来处理来自编码器的关键帧。

所有这些都使用 Gstreamer 版本 1.18.5 进行了测试。这是 Python 中展示此方法的一个非常基本的示例:

#!/usr/bin/env python3
import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

def _handle_message(_, message, loop):
    """handle messages posted to pipeline bus"""
    if message.type == Gst.MessageType.EOS:
        print("End-of-stream")
        loop.quit()
    elif message.type == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
            Gst.ResourceError.quark(), Gst.ResourceError(err.code)
        ):
            resource_error = Gst.ResourceError(err.code)
            print(f"caught {resource_error} from rtmp2sink, ignoring")
        else:
            print(f"caught error {err} ({debug}), exiting")
            loop.quit()
    return True

def _wrap_rtmp_sink(rtmpsink: Gst.Element):
    """wrap RTMP sink to make it handle reconnections"""

    def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
        internal_pad = pad.get_internal()
        result = internal_pad.push(buffer)
        if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
            print(f"Restarting RTMP sink after {result}")
            rtmpsink.set_state(Gst.State.NULL)
            rtmpsink.set_state(Gst.State.PLAYING)
            return Gst.FlowReturn.OK

        return result

    sinkpad = rtmpsink.get_static_pad("sink")
    peer = sinkpad.get_peer()
    peer.unlink(sinkpad)

    ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
    ghost_pad.set_chain_function_full(_chain)
    peer.link(ghost_pad)
    ghost_pad.activate_mode(Gst.PadMode.PUSH, True)

    # hang on to GhostPad to avoid Python garbage collecting it
    rtmpsink._ghost_pad = ghost_pad

def main():
    Gst.init(None)
    pipeline = Gst.parse_launch(
        f"""
        videotestsrc
            ! video/x-raw,width=1280,height=720,framerate=30/1
            ! avenc_h264_videotoolbox
            ! h264parse
            ! flvmux.video

        audiotestsrc
            ! faac
            ! flvmux.audio

        flvmux name=flvmux streamable=true
            ! queue
            ! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
    """
    )

    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", _handle_message, loop)

    _wrap_rtmp_sink(pipeline.get_by_name("rtmp"))

    pipeline.set_state(Gst.State.PLAYING)
    loop.run()
    pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    main()