gstreamer 中的无缝视频循环

Seamless video loop in gstreamer

我正在尝试使用 gstreamer 循环播放视频,它是 python 绑定。第一次尝试是挂钩 EOS message 并为管道生成搜索消息:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.EOS:  # End-Of-Stream: loop the video, seek to beginning
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.FLUSH,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("ERROR", message)
                break
        time.sleep(0.01) # Tried 0.001 - same result

if __name__ == "__main__":
    main()

它实际上工作得很好,除了一件事 - 搜索到开头并不是真的无缝。我可以看到微小的故障。因为视频是无限动画,所以这个小故障实际上变得很明显。我的第二次尝试是对解码帧使用队列并挂钩 EOS event:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def cb_event(pad, info, *user_data):
    event = info.get_event()
    if event is not None and event.type == Gst.EventType.EOS:
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.PASS

def main():
    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)

    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

在第一个 EOS 事件之后,播放就停止了。我尝试了几种不同的方法,例如:pass EOS 事件,drop EOS 并向解码器的源板添加偏移量,将搜索事件发送到管道本身和其他。但我无法让它工作。

为了理解,我还尝试启用调试模式并使用 pad 探针编写我自己的管道记录器 activity。调试模式不是很有用,日志非常庞大并且缺少一些细节。我自己的日志包括 upstream/downstream 事件和缓冲区计时信息。但是,我仍然不明白哪里出了问题以及如何让它工作。

显然,我不仅遗漏了一些东西,而且不了解有关 gstreamer 管道工作原理的一些基本知识。

所以,问题是: 我应该如何处理第二版代码才能使其正常工作?
其他问题:是否有一些工具或技术可以清楚地了解管道及其包含的元素内部发生的情况?

非常感谢详细的回答。对我来说,理解我做错了什么比让程序正常工作更重要。

p.s. 程序在 NanoPi S2 板的 GNU/Linux 下 运行。视频存储在 MP4 容器中(无音频)并使用 h264 压缩。请随意 post 任何语言的代码示例,不一定 Python.

我建议查看 gst-play-1.0 应用程序以及 GStreamer 的 playbin 元素。

看这里:https://github.com/GStreamer/gst-plugins-base/blob/master/tools/gst-play.c

这个支持 --gapless 选项来无间断地播放很多文件。它利用 playbin 元素的 about-to-finish 信号。

这个特定的应用程序使用多个文件而不是相同的文件来执行此操作,但我想您可以尝试多次提供相同的文件以进行测试,如果它真的是无缝的或表现出与您的方法 1) 相同的问题。

基本上我认为 EOS 只是有点太晚了,因为解码器 deinit/init/processing 和冲洗管道而无法及时再次准备好第一帧。此外,刷新会重置您的流,管道再次进入预滚动并同步到新时钟。真的不是从里面源源不断

或者,GStreamer 编辑服务也可以做到这一点。但这可能适用于多个轨道,这意味着它可能会尝试同时实例化多个解码器实例以进行并行处理——这可能是您板上的一个问题。

最后的手段可能是将 MP4 解复用为原始比特流,将比特流连续循环到套接字中并从中解码。然后它将显示为正在播放的无限比特流。

编辑: 或许也值得一试 multifilesrc 及其 loop 属性,看看它是否可以无缝运行或是否也必须在文件之间执行刷新。

嗯,好的。我没有得到 答案 所以我继续研究并最终找到了解决方案。 下面我将展示两种不同的方法。首先 - 使用工作代码示例直接回答问题。第二 - 不同的方法,这似乎更适合 gstreamer,而且绝对更简单。两者都提供了理想的结果 - 无缝视频循环。

更正代码(答案,但不是最佳方法)

变化:

  1. 添加了视频时长查询。每个循环我们都应该增加 video duration 值的时间偏移。它使模拟无限连续流成为可能。
  2. 发出的搜索事件已移至单独的线程。根据this post, we can not emit seek event from the streaming thread. Also, have a look at this file(link来自post)。
  3. 事件回调现在丢弃 FLUSH 个事件(连续流不应有 FLUSH 个事件)。
  4. 视频解码器从 nxvideodec 更改为 avdec_h264。这与最初的问题无关,已完成 for a very special reason.

代码:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time
import threading

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok

######################################################

seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
    cumulative_offset = 0
    while True:
        seek_requested.wait()
        seek_requested.clear()
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        # Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
        cumulative_offset += duration
        queue_sink_pad.set_offset(cumulative_offset)

def cb_event(pad, info):
    event = info.get_event()
    if event is not None:
        if event.type == Gst.EventType.EOS:  # UPD: Set 'seek_requested' flag
            seek_requested.set()
            return Gst.PadProbeReturn.DROP
        elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP:  # UPD: Drop FLUSH
            return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.OK

def main():
    queue_sink_pad = queue.get_static_pad("sink")

    # UPD: Create separate 'seek thread'
    threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()

    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
                           cb_event)

    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

此代码有效。在队列中的缓冲区仍在播放时有效地执行查找。但是,我相信它可能包含一些缺陷甚至错误。例如,SEGMENT 事件通过 RESET 标志向下游传递;这似乎不对。实现此方法的更清晰(可能更多 correct/reliable)方法是创建一个 gstreamer 插件。插件将管理事件并调整事件和缓冲区的时间戳。

但是有一个更简单和原生的解决方案:

使用分段搜索和 SEGMENT_DONE 消息

根据 documentation:

Segment seeking (using the GST_SEEK_FLAG_SEGMENT) will not emit an EOS at the end of the playback segment but will post a SEGMENT_DONE message on the bus. This message is posted by the element driving the playback in the pipeline, typically a demuxer. After receiving the message, the application can reconnect the pipeline or issue other seek events in the pipeline. Since the message is posted as early as possible in the pipeline, the application has some time to issue a new seek to make the transition seamless. Typically the allowed delay is defined by the buffer sizes of the sinks as well as the size of any queues in the pipeline.

消息 SEGMENT_DONE 确实 post 在队列变空之前编辑。这提供了足够的时间来执行下一次搜索。所以我们需要做的就是在播放的一开始就发出段搜索。然后等待 SEGMENT_DONE 消息并发送下一个 非刷新 寻求事件。 这是工作示例:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)
    pipeline0.get_state(Gst.CLOCK_TIME_NONE)
    pipeline0.seek(1.0,
                   Gst.Format.TIME,
                   Gst.SeekFlags.SEGMENT,
                   Gst.SeekType.SET, 0,
                   Gst.SeekType.NONE, 0)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.SEGMENT_DONE:
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.SEGMENT,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("bus ERROR", message)
                break
        time.sleep(0.01)

if __name__ == "__main__":
    main()

使用默认队列配置,SEGMENT_DONE 消息比播放最后一个视频帧早 post 大约 1 秒。非刷新搜索确保 none 帧将丢失。这一起提供了完美的结果 - 真正无缝的视频循环。

注意:我将管道切换到PLAYING状态,然后执行初始非刷新查找。或者我们可以将管道切换到 PAUSED 状态,执行 flushing 段查找,然后将管道切换到 PLAYING 状态。

注2:不同的来源建议略有不同的解决方案。请参阅下面的 link。


相关主题和来源:

  1. http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
    • https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c
  2. http://gstreamer-devel.966125.n4.nabble.com/Loop-a-file-using-playbin-without-artefacts-td4671952.html

我正在使用 SEGMENT_DONE 方法:

import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)

pipeline = Gst.parse_launch('uridecodebin uri=file://%s name=d d. ! autovideosink d. ! autoaudiosink' %sys.argv[1])
bus = pipeline.get_bus()
bus.add_signal_watch()

def on_segment_done(bus, msg):
    pipeline.seek(1.0,
        Gst.Format.TIME,
        Gst.SeekFlags.SEGMENT,
        Gst.SeekType.SET, 0,
        Gst.SeekType.NONE, 0)
    return True
bus.connect('message::segment-done', on_segment_done)

pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(Gst.CLOCK_TIME_NONE)
pipeline.seek(1.0,
    Gst.Format.TIME,
    Gst.SeekFlags.SEGMENT,
    Gst.SeekType.SET, 0,
    Gst.SeekType.NONE, 0)

GLib.MainLoop().run()