从父进程在子进程中为 mainloop 运行 调用 GMainLoop.quit()

Calling GMainLoop.quit() for mainloop running in child process, from parent process

我需要 运行 gstreamer 管道来执行视频流。 GStreamer 管道需要一个 GObject.MainLoop 对象,它有一个 run() 方法,该方法在调用 quit() 之前不会终止。 为此,我从我的主应用程序进程 (P1) 创建了一个进程 (P2),它 运行 是其主线程中的 GObject.MainLoop 实例。问题是循环在进程 P2 中无限期地进行,我无法从主应用程序进程 (P1) exit/quit 它。

以下是可能有助于理解场景的代码部分。

'''
start() spawns a new process P2 that runs Mainloop within its main thread.
stop() is called from P1, but does not quit the Mainloop. This is probably because 
processes do not have shared memory
'''
from multiprocessing import Process
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.process = None
        self.loop = GObject.MainLoop()

    def worker(self):
        self.loop.run()

    def start(self):
        self.process=Process(target=self.worker, args=())
        self.process.start()

    def stop(self):
        self.loop.quit()

接下来,我尝试使用多处理队列在进程之间共享 'loop' 变量,但我仍然无法退出主循环。

'''
start() spawns a new process and puts the loop object in a multiprocessing Queue
stop() calls get() from the loop and calls the quit() method, though it still does not quit the mainloop.
'''
from multiprocessing import Process, Queue
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.p=None
        self.loop = GObject.MainLoop()
        self.queue = Queue()

    def worker(self):
        self.queue.put(self.loop)
        self.loop.run()


    def start(self):
        self.p=Process(target=self.worker, args=())
        self.p.start()

    def stop(self):
        # receive loop instance shared by Child Process
        loop=self.queue.get()
        loop.quit()

如何调用只能在子进程 P2 中访问的 MainLoop 对象的退出方法?

好的,首先我们需要使用线程而不是进程。进程将位于不同的地址 space.

What is the difference between a process and a thread?

尝试将主循环对象传递给执行实际工作的单独线程。这将使您的 main 方法变成一个基本的 GLib 事件处理循环,但这很好,并且是许多 GLib 应用程序中的正常行为。

最后,我们需要处理子进程在主循环激活之前完成其工作的竞争条件。我们使用 while not loop.is_running() 片段来做到这一点。

from threading import Thread
import gi

from gi.repository import GObject

def worker(loop):
    while not loop.is_running():
        print("waiting for loop to run")

    print("working")

    loop.quit()
    print("quitting")

class Main:
    def __init__(self):
        self.thread = None
        self.loop = GObject.MainLoop()

    def start(self):
        self.thread=Thread(target=worker, args=(self.loop,))
        self.thread.start()
        self.loop.run()

def main():
    GObject.threads_init()

    m = Main()
    m.start()

if  __name__ =='__main__' : main()

我在我的 class Main 中扩展了 multiprocessing.Process 模块并覆盖了它的 run() 方法实际上 运行 另一个里面的 GObject.Mainloop 实例线程 (T1) 而不是它的 main thread。然后实现了一个wait-notify机制,使Process(P2)的main thread进入wait-notify循环,并使用multiprocessing.Queue转发消息到主线程P2P2会同时收到通知。例如,stop() 方法,它将向 P2 发送 quit 消息,在覆盖的 run() 方法中为其定义了处理程序。 可以扩展此模块以将任意数量的消息解析到 Child Process,前提是它们的处理程序也要定义。

以下是我使用的代码片段。

from multiprocessing import Process, Condition, Queue
from threading import Thread
import gi

from gi.repository import GObject

loop=GObject.MainLoop()

def worker():
    loop.run()

class Main(Process):

    def __init__(self, target=None, args=()):
        self.target=target
        self.args=tuple(args)
        print self.args
        self.message_queue = Queue()
        self.cond = Condition()
        self.thread = None
        self.loop = GObject.MainLoop()
        Process.__init__(self)

    def run(self):
        if self.target:
            self.thread = Thread(target=self.target, args=())
            print "running target method"
            self.thread.start()

        while True:
            with self.cond:
                self.cond.wait()
            msg = self.message_queue.get()
            if msg == 'quit':
                print loop.is_running()
                loop.quit()
                print loop.is_running()
                break
            else:
                print 'message received', msg

    def send_message(self, msg):
        self.message_queue.put(msg)
        with self.cond:
            self.cond.notify_all()

    def stop(self):
        self.send_message("quit")
        self.join()

    def func1(self):
        self.send_message("msg 1") # handler is defined in the overridden run method

    # few others functions which will send unique messages to the process, and their handlers 
    # are defined in the overridden run method above

这个方法在我的场景下效果很好,但如果有更好的方法,欢迎提出建议。