Gstreamer,如何从(rtmpsink)错误中恢复
Gstreamer, how recover from (rtmpsink) error
我正在 python 中使用 gstreamer 构建流媒体应用程序。
应用程序使用 tee
元素将数据写入 rtmpsink 和 filesink。启动和流媒体在理想环境(本地网络)中工作正常,但是如果与流媒体服务器断开连接怎么办?我正在尝试弄清楚如何保持管道 运行 并因此在发生错误后继续写入 filesink...
我要归档的内容:
- 至少我想在流媒体部分 (rtmpsink) 发生错误后保留我的存档文件 (filesink)。因此,如果发生错误,我们有一些备份。
- 手动重新连接到流媒体服务器。
- 建立一些机制来检查连接并在可能的情况下重新连接流式传输部分 (rtmpsink)。
问题:
是否可以存档我正在尝试做的事情?
如何归档(动态管道/探测/额外元素)?
非常感谢任何解释、示例或指向正确方向的信息。
注:
Gst 版本: gstreamer 1.3.90 (rtmpsink, faac, x264enc)
OS: ubuntu 14.04 LTS
流媒体服务器: wowza 4.x
测试应用程序(代码): link
启动后的流水线(OK): link
pipeline after rtmpsink error(Failed to write data): link
rtmpsink 错误后的日志片段(写入数据失败): link
我不确定使用单个管道获得的系统有多可靠。我建议做的是创建一个两阶段流程:
1) audio -> encode -> tee -> filesink
-> shmsink
2) shmsrc -> mux -> rtmpsink
然后为第二个管道创建包装器脚本。下面是如何将这些元素与 videotestsrc 一起使用的示例。请注意,上限非常重要——它们必须足够详细才能知道共享内存中的内容。
gst-launch-1.0 videotestsrc ! tee name=t ! queue ! videoconvert !
ximagesink t. ! video/x-raw,width=400,height=400,format=BGRA ! shmsink
wait-for-connection=false socket-path=/tmp/shr
gst-launch-1.0 shmsrc socket-path=/tmp/shr !
video/x-raw,width=400,height=400,format=BGRA,framerate=30/1 !
videoconvert ! ximagesink
您也可以使用 TCP/UDP 而不是共享内存来尝试这种方法。我没有安装 faac 插件,但管道可能是这样的:
audio -> faac -> rtpmp4apay -> udpsink host=localhost port=1919
udpsrc port=1919 -> rtpmp4adepay -> mux -> rtmpsink
我也一直在尝试让管道在出错后重新连接到 RTMP 服务器。原则上我同意 (使用与 shmsink/shmsrc 对连接的两个管道)但我无法让它可靠地工作,所以我最终使用了不同的策略。
我正在使用 rtmp2sink
,当它遇到错误时,它会 post 管道总线上的消息,然后 return GST_FLOW_FLUSHING 导致管道冲洗一切。这不是我感兴趣的,所以我在 rtmp2sink
前面添加了一个 GhostPad,它捕获 return 值并将其转回 GST_FLOW_OK。那时我还重置了 rtmp2sink
元素以使其重新连接。
这看起来相当可靠,至少在我使用的 RTMP 服务器上,我不需要做任何特殊的事情来处理来自编码器的关键帧。
所有这些都使用 Gstreamer 版本 1.18.5 进行了测试。这是 Python 中展示此方法的一个非常基本的示例:
#!/usr/bin/env python3
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
def _handle_message(_, message, loop):
"""handle messages posted to pipeline bus"""
if message.type == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
Gst.ResourceError.quark(), Gst.ResourceError(err.code)
):
resource_error = Gst.ResourceError(err.code)
print(f"caught {resource_error} from rtmp2sink, ignoring")
else:
print(f"caught error {err} ({debug}), exiting")
loop.quit()
return True
def _wrap_rtmp_sink(rtmpsink: Gst.Element):
"""wrap RTMP sink to make it handle reconnections"""
def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
internal_pad = pad.get_internal()
result = internal_pad.push(buffer)
if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
print(f"Restarting RTMP sink after {result}")
rtmpsink.set_state(Gst.State.NULL)
rtmpsink.set_state(Gst.State.PLAYING)
return Gst.FlowReturn.OK
return result
sinkpad = rtmpsink.get_static_pad("sink")
peer = sinkpad.get_peer()
peer.unlink(sinkpad)
ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
ghost_pad.set_chain_function_full(_chain)
peer.link(ghost_pad)
ghost_pad.activate_mode(Gst.PadMode.PUSH, True)
# hang on to GhostPad to avoid Python garbage collecting it
rtmpsink._ghost_pad = ghost_pad
def main():
Gst.init(None)
pipeline = Gst.parse_launch(
f"""
videotestsrc
! video/x-raw,width=1280,height=720,framerate=30/1
! avenc_h264_videotoolbox
! h264parse
! flvmux.video
audiotestsrc
! faac
! flvmux.audio
flvmux name=flvmux streamable=true
! queue
! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
"""
)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", _handle_message, loop)
_wrap_rtmp_sink(pipeline.get_by_name("rtmp"))
pipeline.set_state(Gst.State.PLAYING)
loop.run()
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
我正在 python 中使用 gstreamer 构建流媒体应用程序。
应用程序使用 tee
元素将数据写入 rtmpsink 和 filesink。启动和流媒体在理想环境(本地网络)中工作正常,但是如果与流媒体服务器断开连接怎么办?我正在尝试弄清楚如何保持管道 运行 并因此在发生错误后继续写入 filesink...
我要归档的内容:
- 至少我想在流媒体部分 (rtmpsink) 发生错误后保留我的存档文件 (filesink)。因此,如果发生错误,我们有一些备份。
- 手动重新连接到流媒体服务器。
- 建立一些机制来检查连接并在可能的情况下重新连接流式传输部分 (rtmpsink)。
问题:
是否可以存档我正在尝试做的事情?
如何归档(动态管道/探测/额外元素)?
非常感谢任何解释、示例或指向正确方向的信息。
注:
Gst 版本: gstreamer 1.3.90 (rtmpsink, faac, x264enc)
OS: ubuntu 14.04 LTS
流媒体服务器: wowza 4.x
测试应用程序(代码): link
启动后的流水线(OK): link
pipeline after rtmpsink error(Failed to write data): link
rtmpsink 错误后的日志片段(写入数据失败): link
我不确定使用单个管道获得的系统有多可靠。我建议做的是创建一个两阶段流程:
1) audio -> encode -> tee -> filesink
-> shmsink
2) shmsrc -> mux -> rtmpsink
然后为第二个管道创建包装器脚本。下面是如何将这些元素与 videotestsrc 一起使用的示例。请注意,上限非常重要——它们必须足够详细才能知道共享内存中的内容。
gst-launch-1.0 videotestsrc ! tee name=t ! queue ! videoconvert ! ximagesink t. ! video/x-raw,width=400,height=400,format=BGRA ! shmsink wait-for-connection=false socket-path=/tmp/shr
gst-launch-1.0 shmsrc socket-path=/tmp/shr ! video/x-raw,width=400,height=400,format=BGRA,framerate=30/1 ! videoconvert ! ximagesink
您也可以使用 TCP/UDP 而不是共享内存来尝试这种方法。我没有安装 faac 插件,但管道可能是这样的:
audio -> faac -> rtpmp4apay -> udpsink host=localhost port=1919
udpsrc port=1919 -> rtpmp4adepay -> mux -> rtmpsink
我也一直在尝试让管道在出错后重新连接到 RTMP 服务器。原则上我同意
我正在使用 rtmp2sink
,当它遇到错误时,它会 post 管道总线上的消息,然后 return GST_FLOW_FLUSHING 导致管道冲洗一切。这不是我感兴趣的,所以我在 rtmp2sink
前面添加了一个 GhostPad,它捕获 return 值并将其转回 GST_FLOW_OK。那时我还重置了 rtmp2sink
元素以使其重新连接。
这看起来相当可靠,至少在我使用的 RTMP 服务器上,我不需要做任何特殊的事情来处理来自编码器的关键帧。
所有这些都使用 Gstreamer 版本 1.18.5 进行了测试。这是 Python 中展示此方法的一个非常基本的示例:
#!/usr/bin/env python3
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
def _handle_message(_, message, loop):
"""handle messages posted to pipeline bus"""
if message.type == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
if message.src.__class__.__name__ == "GstRtmp2Sink" and err.matches(
Gst.ResourceError.quark(), Gst.ResourceError(err.code)
):
resource_error = Gst.ResourceError(err.code)
print(f"caught {resource_error} from rtmp2sink, ignoring")
else:
print(f"caught error {err} ({debug}), exiting")
loop.quit()
return True
def _wrap_rtmp_sink(rtmpsink: Gst.Element):
"""wrap RTMP sink to make it handle reconnections"""
def _chain(pad: Gst.Pad, _, buffer: Gst.Buffer):
internal_pad = pad.get_internal()
result = internal_pad.push(buffer)
if result == Gst.FlowReturn.FLUSHING or result == Gst.FlowReturn.ERROR:
print(f"Restarting RTMP sink after {result}")
rtmpsink.set_state(Gst.State.NULL)
rtmpsink.set_state(Gst.State.PLAYING)
return Gst.FlowReturn.OK
return result
sinkpad = rtmpsink.get_static_pad("sink")
peer = sinkpad.get_peer()
peer.unlink(sinkpad)
ghost_pad = Gst.GhostPad.new("proxypad", sinkpad)
ghost_pad.set_chain_function_full(_chain)
peer.link(ghost_pad)
ghost_pad.activate_mode(Gst.PadMode.PUSH, True)
# hang on to GhostPad to avoid Python garbage collecting it
rtmpsink._ghost_pad = ghost_pad
def main():
Gst.init(None)
pipeline = Gst.parse_launch(
f"""
videotestsrc
! video/x-raw,width=1280,height=720,framerate=30/1
! avenc_h264_videotoolbox
! h264parse
! flvmux.video
audiotestsrc
! faac
! flvmux.audio
flvmux name=flvmux streamable=true
! queue
! rtmp2sink name=rtmp location=rtmp://10.1.0.10/test/test
"""
)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", _handle_message, loop)
_wrap_rtmp_sink(pipeline.get_by_name("rtmp"))
pipeline.set_state(Gst.State.PLAYING)
loop.run()
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()