在 QThread 中创建 GStreamer 管道时未收到消息

Messages are not received when GStreamer pipeline is created in a QThread

我有一个 PyQt 应用程序,它在用户按下按钮并侦听该管道总线上的消息时创建一个 GStreamer 管道。

import gi

gi.require_version("Gst", "1.0")

from gi.repository import Gst, GLib
from PyQt5.QtWidgets import QApplication, QPushButton


Gst.init()

pipeline = None


def on_pipeline_message(bus, message):
    print("Got a message from pipeline:", message.type)
    return True


def on_button_press():
    global pipeline

    pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
    pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
    pipeline.set_state(Gst.State.PLAYING)


app = QApplication([])

playback_button = QPushButton("Press to Start Playback", None)
playback_button.clicked.connect(on_button_press)
playback_button.show()

app.exec()

以上代码按预期工作,我的 on_pipeline_message 回调函数被调用。但是,如果我决定将管道创建代码移动到单独的 QThread 中:

class MakePipelineThread(QThread):
    def run(self):
        global pipeline

        pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
        pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
        pipeline.set_state(Gst.State.PLAYING)

... 并在按下按钮时启动该 QThread:

make_pipeline_thread = MakePipelineThread()


def on_button_press():
    make_pipeline_thread.start()

我的 on_pipeline_message 回调不再是 运行。为什么我在单独的 QThread 中创建管道很重要?如何继续接收消息?

GStreamer 和 Qt 都使用 GLib.MainContext 类型来异步处理发送方和接收方之间的消息传输1。默认情况下,GStreamer 和 Qt 都通过全局默认 MainContext 实例传输消息,可通过 GLib.MainContext.default() 访问。发送消息时,无论是来自用户输入、管道还是其他任何地方,它们最初都存储在消息队列中。 Qt 定期迭代 MainContext,它从队列中提取消息并将它们发送给任何侦听器。这就是为什么当管道在 UI 线程上启动时,您能够从 GStreamer 管道接收消息。

但是,当 Qt 启动一个新的 QThread 时,它还会创建一个新的 MainContext 对象并将其设置为该线程的默认上下文。当您在 QThread 中创建 GStreamer 管道时,您的管道和观察器将注册到该上下文而不是全局默认值。 Qt 不会自动为您迭代 QThread 的 MainContext,因此 除非您自己迭代上下文,否则不会收到任何消息。这可以通过在 QThread 中调用 QCoreApplication.processEvents() 来完成。

class MakePipelineThread(QThread):
    def run(self):
        global pipeline

        pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
        pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
        pipeline.set_state(Gst.State.PLAYING)

        # Process events until the pipeline reaches the null state
        _, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
        while state != Gst.State.NULL:
            QCoreApplication.processEvents()
            _, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)

这当然意味着只要管道 运行s,QThread 就会 运行,而不是管道一构建就停止。

或者,您可以使用 set_sync_handler 而不是 add_watch。这告诉总线 运行 您的回调立即在发送消息的同一线程上,而不是通过 MainContext 异步发送消息。

class MakePipelineThread(QThread):
    def run(self):
        global pipeline

        pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
        pipeline.bus.set_sync_handler(on_pipeline_message)
        pipeline.set_state(Gst.State.PLAYING)

这消除了完全迭代 MainContext 的需要,但这意味着您的回调将 运行 在 GStreamer 的 "streaming threads" 之一中,并且会阻止该线程在您的回调执行时执行其他工作运行宁.


1 某些平台可能会在没有 GLib 支持的情况下编译 Qt,在这种情况下 Qt 将使用自己的事件处理系统。在那种情况下,只要应用程序自己迭代全局默认上下文,就不会出现此问题。您可以将 QT_NO_GLIB 环境变量设置为 1 以强制 Qt 在 运行 时不使用 GLib。