等待第一个子进程完成
Wait for the first subprocess to finish
我有一个 subprocess
进程的列表。我不和他们交流,只是等待。
我想等待第一个进程完成(此解决方案有效):
import subprocess
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
# wait for the first process to finish
while True:
over = False
for child in {a, b}:
try:
rst = child.wait(timeout=5)
except subprocess.TimeoutExpired:
continue # this subprocess is still running
if rst is not None: # subprocess is no more running
over = True
break # If either subprocess exits, so do we.
if over:
break
我不想使用 os.wait()
,因为它可能 return 来自另一个 subprocess
而不是我正在等待的列表的一部分。
一个漂亮而优雅的解决方案可能是使用 epoll
或 select 并且没有任何循环。
几乎有两种方法可以做到这一点,如果您希望命令阻塞并暂停您的程序直到它完成,请使用subprocess.call
a = subprocess.call('...')
b = subprocess.call('...')
不过我认为这不是您想要的。
如果你不想让他们暂停你的整个程序,只需要在调用另一个之前检查其中一个是否完成,你应该使用 .communicate
```py
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
....
for child in {a, b}:
try:
result, err = child.communicate(timeout=5)
.communicate
几乎是最优雅、最简单且推荐的解决方案
如果您不需要从进程中获取输出,Popen.poll()
似乎是检查它们是否已完成的最简单方法。下面的 while True
循环纯粹是为了演示目的:您可以决定如何在较大的程序中执行此操作(例如,在单独的线程中进行检查,在程序的其他工作之间进行检查等) .
from subprocess import Popen
from time import sleep
ps = [
Popen(['sleep', t])
for t in ('3', '5', '2')
]
while True:
exit_codes = [p.poll() for p in ps]
print(exit_codes)
if any(ec is not None for ec in exit_codes):
break
else:
sleep(1)
演示输出:
[None, None, None]
[None, None, None]
[None, None, 0]
这是一个使用 psutil 的解决方案 - 正是 针对此用例:
import subprocess
import psutil
a = subprocess.Popen(['/bin/sleep', "2"])
b = subprocess.Popen(['/bin/sleep', "4"])
procs_list = [psutil.Process(a.pid), psutil.Process(b.pid)]
def on_terminate(proc):
print("process {} terminated".format(proc))
# waits for multiple processes to terminate
gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate)
或者,如果您希望有一个循环等待其中一个过程完成:
while True:
gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate)
if len(gone)>0:
break
您可以为此使用os.wait()
。您只需循环调用它,直到它告诉您您关心的进程之一已退出。
import subprocess
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
# wait for the first process to finish
watched_pids = set(proc.pid for proc in (a, b))
while True:
pid, _ = os.wait()
if pid in watched_pids:
break
但是,os.wait()
的隐藏副作用是您丢失了进程的退出代码。在 os.wait()
完成后它将只是 None
,如果您稍后调用 proc.wait()
、proc.poll()
或 proc.communicate()
他们将无法找到 return 代码,默认为0。可以自己设置,但有点hacky。
def wait_and_handle_exitstatus(all_procs):
pid, status = os.wait()
for proc in all_procs:
if proc.pid == pid:
# We need to set the process's exit status now, or we
# won't be able to retrieve it later and it will be
# assumed to be 0.
# This is a kind of hacky solution, but this function has existed
# ever since subprocess was first included in the stdlib and is
# still there in 3.10+, so it *should* be pretty stable.
proc._handle_exitstatus(status)
return pid, status
然后您可以使用第一个代码块,只需将 os.wait()
替换为 wait_and_handle_exitstatus(ALL_PROCS)
。但是,您确实必须传递 wait_and_handle_exitstatus
可能是 运行 的所有子进程(Popen
对象)的列表并且您可能关心 return 的代码,因此它可以找到进程并设置它的退出代码。
我有一个 subprocess
进程的列表。我不和他们交流,只是等待。
我想等待第一个进程完成(此解决方案有效):
import subprocess
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
# wait for the first process to finish
while True:
over = False
for child in {a, b}:
try:
rst = child.wait(timeout=5)
except subprocess.TimeoutExpired:
continue # this subprocess is still running
if rst is not None: # subprocess is no more running
over = True
break # If either subprocess exits, so do we.
if over:
break
我不想使用 os.wait()
,因为它可能 return 来自另一个 subprocess
而不是我正在等待的列表的一部分。
一个漂亮而优雅的解决方案可能是使用 epoll
或 select 并且没有任何循环。
几乎有两种方法可以做到这一点,如果您希望命令阻塞并暂停您的程序直到它完成,请使用subprocess.call
a = subprocess.call('...')
b = subprocess.call('...')
不过我认为这不是您想要的。
如果你不想让他们暂停你的整个程序,只需要在调用另一个之前检查其中一个是否完成,你应该使用 .communicate
```py
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
....
for child in {a, b}:
try:
result, err = child.communicate(timeout=5)
.communicate
几乎是最优雅、最简单且推荐的解决方案
如果您不需要从进程中获取输出,Popen.poll()
似乎是检查它们是否已完成的最简单方法。下面的 while True
循环纯粹是为了演示目的:您可以决定如何在较大的程序中执行此操作(例如,在单独的线程中进行检查,在程序的其他工作之间进行检查等) .
from subprocess import Popen
from time import sleep
ps = [
Popen(['sleep', t])
for t in ('3', '5', '2')
]
while True:
exit_codes = [p.poll() for p in ps]
print(exit_codes)
if any(ec is not None for ec in exit_codes):
break
else:
sleep(1)
演示输出:
[None, None, None]
[None, None, None]
[None, None, 0]
这是一个使用 psutil 的解决方案 - 正是 针对此用例:
import subprocess
import psutil
a = subprocess.Popen(['/bin/sleep', "2"])
b = subprocess.Popen(['/bin/sleep', "4"])
procs_list = [psutil.Process(a.pid), psutil.Process(b.pid)]
def on_terminate(proc):
print("process {} terminated".format(proc))
# waits for multiple processes to terminate
gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate)
或者,如果您希望有一个循环等待其中一个过程完成:
while True:
gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate)
if len(gone)>0:
break
您可以为此使用os.wait()
。您只需循环调用它,直到它告诉您您关心的进程之一已退出。
import subprocess
a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])
# wait for the first process to finish
watched_pids = set(proc.pid for proc in (a, b))
while True:
pid, _ = os.wait()
if pid in watched_pids:
break
但是,os.wait()
的隐藏副作用是您丢失了进程的退出代码。在 os.wait()
完成后它将只是 None
,如果您稍后调用 proc.wait()
、proc.poll()
或 proc.communicate()
他们将无法找到 return 代码,默认为0。可以自己设置,但有点hacky。
def wait_and_handle_exitstatus(all_procs):
pid, status = os.wait()
for proc in all_procs:
if proc.pid == pid:
# We need to set the process's exit status now, or we
# won't be able to retrieve it later and it will be
# assumed to be 0.
# This is a kind of hacky solution, but this function has existed
# ever since subprocess was first included in the stdlib and is
# still there in 3.10+, so it *should* be pretty stable.
proc._handle_exitstatus(status)
return pid, status
然后您可以使用第一个代码块,只需将 os.wait()
替换为 wait_and_handle_exitstatus(ALL_PROCS)
。但是,您确实必须传递 wait_and_handle_exitstatus
可能是 运行 的所有子进程(Popen
对象)的列表并且您可能关心 return 的代码,因此它可以找到进程并设置它的退出代码。