如何在 Python 和 Windows 中使用多处理从子进程中杀死父进程?
How can I kill a parent from a child process using multiprocessing in Python and Windows?
我想在其中一个子进程到达特定点时结束我的脚本。假设我有以下代码:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
def my_process(my_queue):
while True:
if my_queue.empty() is False:
my_queue.get()
print("Killing the program...")
### THIS IS WHERE I WANT TO KILL MAIN PROCESS AND EXIT
sys.exit(0)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
while True:
pass ## I want to end the program in the child process
if __name__=="__main__":
main()
我读过一些关于使用信号的内容,但它们主要用于 Linux,我不太了解如何在 Windows 中使用它们。我在现实中是 Python 的初学者。我怎样才能完全结束脚本?
首先,如果您仔细阅读文档,您会发现 multiprocessing.Queue
上的调用方法 is_empty
不可靠,不应使用。此外,您有一个竞争条件。也就是说,如果 my_process
在 another_child_process
之前运行(假设 is_empty
是可靠的),它会发现队列为空并提前终止,因为 another_child_process
还没有机会将任何项目放入队列。所以你应该做的是让 another_child_process
把它想要的任何消息放在队列中,然后再放一个额外的 sentinel 项目,其目的是表示没有更多的项目可以将被放入队列。因此 sentinel 用作准 end-of-file 指标。您可以使用任何不同的对象作为 sentinel,只要它不能被视为“真实”数据项。在这种情况下,我们将使用 None
作为标记。
但是您编写的实际示例并不是一个真实的示例,说明为什么您需要一些特殊机制来终止主进程并退出,因为一旦 another_process
将其项目放入队列,它 returns 因此进程终止,一旦 my_process
检测到它已从队列中检索到所有项目并且不会再有,它 returns 因此其进程终止。因此,主进程所要做的就是在两个子进程上发出对 join
的调用并等待它们完成然后退出:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None:
break
print('Item:', item)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
child_process.join()
another_process.join()
if __name__=="__main__":
main()
打印:
Item: finish
这里也许是一个更好的例子。 another_child_process
以某种方式获取数据(出于演示目的,我们有一个生成器函数,get_data
)。如果没有异常情况发生,它会将所有数据放在 my_process
的队列中,以供 None
哨兵项跟随,因此 my_process
知道没有更多数据即将到来并且它可以终止.但是让我们假设有可能 get_data
产生一个特殊的异常数据项,用于演示目的的字符串 'finish'。在那种情况下 another_child_process
将立即终止。然而,此时队列中有许多 my_process
尚未检索和处理的项目。我们想立即强制终止 my_process
以便主进程可以立即 join
子进程并终止。
为此,我们将事件传递给由主进程启动的守护线程,等待事件被设置。如果事件是由 another_child_process
设置的,我们也将事件传递给它,线程将立即终止 my_process
进程:
import multiprocessing
import time
import sys
def get_data():
for item in ['a', 'b', 'c', 'finish', 'd', 'e', 'f', 'g']:
yield item
def another_child_process(my_queue, exit_event):
for item in get_data():
if item == 'finish':
# Abnormal condition where we must exit imemediately.
# Immediately signal main process terminate:
exit_event.set()
# And we terminate:
return
my_queue.put(item)
# Normal situation where we just continue
# Put in sentinel signifying no more data:
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None: # Sentinel?
# No more data:
break
print("Got: ", repr(item))
print('my_process terminating normally.')
def main():
import threading
def wait_for_quit(exit_event):
nonlocal child_process
exit_event.wait()
child_process.terminate()
print("Exiting because event was set.")
exit_event = multiprocessing.Event()
# Start daemon thread that will wait for the quit_event
threading.Thread(target=wait_for_quit, args=(exit_event,), daemon=True).start()
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue, exit_event))
child_process.start()
another_process.start()
# Wait for processes to end:
child_process.join()
another_process.join()
if __name__=="__main__":
main()
打印:
Got: 'a'
Exiting because event was set.
如果从 get_data
返回的数据中删除 finish
消息,则所有进程将正常完成,打印的内容将是:
Got: 'a'
Got: 'b'
Got: 'c'
Got: 'd'
Got: 'e'
Got: 'f'
Got: 'g'
my_process terminating normally.
我想在其中一个子进程到达特定点时结束我的脚本。假设我有以下代码:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
def my_process(my_queue):
while True:
if my_queue.empty() is False:
my_queue.get()
print("Killing the program...")
### THIS IS WHERE I WANT TO KILL MAIN PROCESS AND EXIT
sys.exit(0)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
while True:
pass ## I want to end the program in the child process
if __name__=="__main__":
main()
我读过一些关于使用信号的内容,但它们主要用于 Linux,我不太了解如何在 Windows 中使用它们。我在现实中是 Python 的初学者。我怎样才能完全结束脚本?
首先,如果您仔细阅读文档,您会发现 multiprocessing.Queue
上的调用方法 is_empty
不可靠,不应使用。此外,您有一个竞争条件。也就是说,如果 my_process
在 another_child_process
之前运行(假设 is_empty
是可靠的),它会发现队列为空并提前终止,因为 another_child_process
还没有机会将任何项目放入队列。所以你应该做的是让 another_child_process
把它想要的任何消息放在队列中,然后再放一个额外的 sentinel 项目,其目的是表示没有更多的项目可以将被放入队列。因此 sentinel 用作准 end-of-file 指标。您可以使用任何不同的对象作为 sentinel,只要它不能被视为“真实”数据项。在这种情况下,我们将使用 None
作为标记。
但是您编写的实际示例并不是一个真实的示例,说明为什么您需要一些特殊机制来终止主进程并退出,因为一旦 another_process
将其项目放入队列,它 returns 因此进程终止,一旦 my_process
检测到它已从队列中检索到所有项目并且不会再有,它 returns 因此其进程终止。因此,主进程所要做的就是在两个子进程上发出对 join
的调用并等待它们完成然后退出:
import multiprocessing
import time
import sys
def another_child_process (my_queue):
time.sleep(3)
my_queue.put("finish")
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None:
break
print('Item:', item)
def main():
## PARENT PROCESS WHICH I WANT TO KILL FROM THE CHILD
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue,))
child_process.start()
another_process.start()
child_process.join()
another_process.join()
if __name__=="__main__":
main()
打印:
Item: finish
这里也许是一个更好的例子。 another_child_process
以某种方式获取数据(出于演示目的,我们有一个生成器函数,get_data
)。如果没有异常情况发生,它会将所有数据放在 my_process
的队列中,以供 None
哨兵项跟随,因此 my_process
知道没有更多数据即将到来并且它可以终止.但是让我们假设有可能 get_data
产生一个特殊的异常数据项,用于演示目的的字符串 'finish'。在那种情况下 another_child_process
将立即终止。然而,此时队列中有许多 my_process
尚未检索和处理的项目。我们想立即强制终止 my_process
以便主进程可以立即 join
子进程并终止。
为此,我们将事件传递给由主进程启动的守护线程,等待事件被设置。如果事件是由 another_child_process
设置的,我们也将事件传递给它,线程将立即终止 my_process
进程:
import multiprocessing
import time
import sys
def get_data():
for item in ['a', 'b', 'c', 'finish', 'd', 'e', 'f', 'g']:
yield item
def another_child_process(my_queue, exit_event):
for item in get_data():
if item == 'finish':
# Abnormal condition where we must exit imemediately.
# Immediately signal main process terminate:
exit_event.set()
# And we terminate:
return
my_queue.put(item)
# Normal situation where we just continue
# Put in sentinel signifying no more data:
my_queue.put(None)
def my_process(my_queue):
while True:
item = my_queue.get()
if item is None: # Sentinel?
# No more data:
break
print("Got: ", repr(item))
print('my_process terminating normally.')
def main():
import threading
def wait_for_quit(exit_event):
nonlocal child_process
exit_event.wait()
child_process.terminate()
print("Exiting because event was set.")
exit_event = multiprocessing.Event()
# Start daemon thread that will wait for the quit_event
threading.Thread(target=wait_for_quit, args=(exit_event,), daemon=True).start()
my_queue = multiprocessing.Queue()
child_process = multiprocessing.Process(target=my_process, args=(my_queue,))
another_process = multiprocessing.Process(target=another_child_process, args=(my_queue, exit_event))
child_process.start()
another_process.start()
# Wait for processes to end:
child_process.join()
another_process.join()
if __name__=="__main__":
main()
打印:
Got: 'a'
Exiting because event was set.
如果从 get_data
返回的数据中删除 finish
消息,则所有进程将正常完成,打印的内容将是:
Got: 'a'
Got: 'b'
Got: 'c'
Got: 'd'
Got: 'e'
Got: 'f'
Got: 'g'
my_process terminating normally.