在 Python 中扩展 mp.Process 3

Extending mp.Process in Python 3

import multiprocessing as mp
import time as t

class MyProcess(mp.Process):

    def __init__(self, target, args, name):
        mp.Process.__init__(self, target=target, args=args)
        self.exit = mp.Event()
        self.name = name
        print("{0} initiated".format(self.name))

    def run(self):
        while not self.exit.is_set():
            pass
        print("Process {0} exited.".format(self.name))

    def shutdown(self):
        print("Shutdown initiated for {0}.".format(self.name))
        self.exit.set()

def f(x):
    while True:
        print(x)
        x = x+1

if __name__ == "__main__":
    p = MyProcess(target=f, args=[3], name="function")
    p.start()
    #p.join()
    t.wait(2)
    p.shutdown()

我正在尝试扩展 multiprocessing.Process class 以添加关闭方法,以便能够退出可能必须 运行 未定义的函数多少时间。按照 Python Multiprocessing Exit Elegantly How? 的说明并添加我自己想出的传递参数,只得到这个输出:

function initiated
Shutdown initiated for function.
Process function exited.

但没有实际方法 f(x) 输出。似乎实际的流程目标没有启动。我显然做错了什么,但就是想不通是什么,有什么想法吗?

谢谢!

处理这种情况的明智方法是,在可能的情况下,通过定期检查 exit 事件,让后台任务配合退出机制。为此,无需子类化 Process:您可以重写后台任务以包含该检查。例如,这是使用该方法重写的代码:

import multiprocessing as mp
import time as t

def f(x, exit_event):
    while not exit_event.is_set():
        print(x)
        x = x+1
    print("Exiting")

if __name__ == "__main__":
    exit_event = mp.Event()
    p = mp.Process(target=f, args=(3, exit_event), name="function")
    p.start()
    t.sleep(2)
    exit_event.set()
    p.join()

如果这不是一个选项(例如,因为您无法在后台作业中修改正在 运行 的代码),那么您可以使用 Process.terminate 方法。但是您应该知道使用它是危险的:child 进程将没有机会正确清理,因此例如,如果它在持有多处理锁时关闭,则没有其他进程能够获得该锁锁,存在死锁的风险。如果可能的话,让 child 配合关闭会更好。

这个问题的解决方法是在你的 class 运行 方法中调用 super().run() 函数。

当然,由于while True的存在,这会导致你的函数永久执行,而指定的事件不会导致它结束。

您可以使用Process.terminate()方法结束您的进程。

import multiprocessing as mp
import time as t
   
   
class MyProcess(mp.Process):
    def __init__(self, target, args, name):
        mp.Process.__init__(self, target=target, args=args)
        self.name = name
        print("{0} initiated".format(self.name))

    def run(self):
        print("Process {0} started.".format(self.name))
        super().run()

    def shutdown(self):
        print("Shutdown initiated for {0}.".format(self.name))
        self.terminate()
   
   
def f(x):
    while True:
        print(x)
        t.sleep(1)
        x += 1


if __name__ == "__main__":
    p = MyProcess(target=f, args=(3,), name="function")
    p.start()
    # p.join()
    t.sleep(5)
    p.shutdown()