如何在 Python 和 Windows 中使用多处理从子进程中杀死父进程?

How can I kill a parent from a child process using multiprocessing in Python and Windows?

我想在其中一个子进程到达特定点时结束我的脚本。假设我有以下代码:

import multiprocessing
import time
import sys


def another_child_process (my_queue):
    time.sleep(3)
    my_queue.put("finish")

def my_process(my_queue):
    while True:
        if my_queue.empty() is False:
            my_queue.get()
            print("Killing the program...")
            ### THIS IS WHERE I WANT TO KILL MAIN PROCESS AND EXIT
            sys.exit(0) 


def main():

    ## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD

    my_queue = multiprocessing.Queue()

    child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
    another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))

    child_process.start()
    another_process.start()

    while True:
        pass ## I want to end the program in the child process

       
if __name__=="__main__":
    main()

我读过一些关于使用信号的内容,但它们主要用于 Linux,我不太了解如何在 Windows 中使用它们。我在现实中是 Python 的初学者。我怎样才能完全结束脚本?

首先,如果您仔细阅读文档,您会发现 multiprocessing.Queue 上的调用方法 is_empty 不可靠,不应使用。此外,您有一个竞争条件。也就是说,如果 my_processanother_child_process 之前运行(假设 is_empty 是可靠的),它会发现队列为空并提前终止,因为 another_child_process 还没有机会将任何项目放入队列。所以你应该做的是让 another_child_process 把它想要的任何消息放在队列中,然后再放一个额外的 sentinel 项目,其目的是表示没有更多的项目可以将被放入队列。因此 sentinel 用作准 end-of-file 指标。您可以使用任何不同的对象作为 sentinel,只要它不能被视为“真实”数据项。在这种情况下,我们将使用 None 作为标记。

但是您编写的实际示例并不是一个真实的示例,说明为什么您需要一些特殊机制来终止主进程并退出,因为一旦 another_process 将其项目放入队列,它 returns 因此进程终止,一旦 my_process 检测到它已从队列中检索到所有项目并且不会再有,它 returns 因此其进程终止。因此,主进程所要做的就是在两个子进程上发出对 join 的调用并等待它们完成然后退出:

import multiprocessing
import time
import sys

def another_child_process (my_queue):
    time.sleep(3)
    my_queue.put("finish")
    my_queue.put(None)

def my_process(my_queue):
    while True:
        item = my_queue.get()
        if item is None:
            break
        print('Item:', item)


def main():

    ## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD

    my_queue = multiprocessing.Queue()

    child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
    another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))

    child_process.start()
    another_process.start()

    child_process.join()
    another_process.join()


if __name__=="__main__":
    main()

打印:

Item: finish

这里也许是一个更好的例子。 another_child_process 以某种方式获取数据(出于演示目的,我们有一个生成器函数,get_data)。如果没有异常情况发生,它会将所有数据放在 my_process 的队列中,以供 None 哨兵项跟随,因此 my_process 知道没有更多数据即将到来并且它可以终止.但是让我们假设有可能 get_data 产生一个特殊的异常数据项,用于演示目的的字符串 'finish'。在那种情况下 another_child_process 将立即终止。然而,此时队列中有许多 my_process 尚未检索和处理的项目。我们想立即强制终止 my_process 以便主进程可以立即 join 子进程并终止。

为此,我们将事件传递给由主进程启动的守护线程,等待事件被设置。如果事件是由 another_child_process 设置的,我们也将事件传递给它,线程将立即终止 my_process 进程:

import multiprocessing
import time
import sys

def get_data():
    for item in ['a', 'b', 'c', 'finish', 'd', 'e', 'f', 'g']:
        yield item

def another_child_process(my_queue, exit_event):
    for item in get_data():
        if item == 'finish':
            # Abnormal condition where we must exit imemediately.
            # Immediately signal main process terminate:
            exit_event.set()
            # And we terminate:
            return
        my_queue.put(item)
    # Normal situation where we just continue
    # Put in sentinel signifying no more data:
    my_queue.put(None)

def my_process(my_queue):
    while True:
        item = my_queue.get()
        if item is None: # Sentinel?
            # No more data:
            break
        print("Got: ", repr(item))
    print('my_process terminating normally.')

def main():
    import threading

    def wait_for_quit(exit_event):
        nonlocal child_process

        exit_event.wait()
        child_process.terminate()
        print("Exiting because event was set.")

    exit_event = multiprocessing.Event()

    # Start daemon thread that will wait for the quit_event
    threading.Thread(target=wait_for_quit, args=(exit_event,), daemon=True).start()

    my_queue = multiprocessing.Queue()
    child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
    another_process = multiprocessing.Process(target=another_child_process, args=(my_queue, exit_event))

    child_process.start()
    another_process.start()

    # Wait for processes to end:
    child_process.join()
    another_process.join()


if __name__=="__main__":
    main()

打印:

Got:  'a'
Exiting because event was set.

如果从 get_data 返回的数据中删除 finish 消息,则所有进程将正常完成,打印的内容将是:

Got:  'a'
Got:  'b'
Got:  'c'
Got:  'd'
Got:  'e'
Got:  'f'
Got:  'g'
my_process terminating normally.