asyncio 在超时时终止子进程
asyncio terminate subprocess on timeout
我有一个必须执行某些 shell 命令的脚本。但是,如果命令需要很长时间才能完成,则必须强行将其杀死。考虑以下代码片段:
import asyncio, random
q = asyncio.Queue()
MAX_WAIT = 5
@asyncio.coroutine
def blocking_task(sec):
print('This task will sleep {} sec.'.format(sec))
create = asyncio.create_subprocess_shell(
'sleep {s}; echo "Woke up after {s} sec." >> ./tst.log'.format(s=sec),
stdout=asyncio.subprocess.PIPE)
proc = yield from create
yield from proc.wait()
@asyncio.coroutine
def produce():
while True:
q.put_nowait(random.randint(3,8))
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
try:
yield from asyncio.wait_for(blocking_task(value), MAX_WAIT)
except asyncio.TimeoutError:
print('~/~ Job has been cancelled !!')
else:
print('=/= Job has been done :]')
loop = asyncio.get_event_loop()
asyncio.ensure_future(produce())
asyncio.ensure_future(consume())
loop.run_forever()
此代码产生以下输出:
This task will sleep 4 sec.
=/= Job has been done :]
This task will sleep 8 sec.
~/~ Job has been cancelled !!
This task will sleep 5 sec.
~/~ Job has been cancelled !!
看来它正在按预期工作,如果作业完成时间太长,就会停止。但是,如果我检查日志,我可以确认无论多么耗时的任务仍在继续 运行 并且实际上并未停止/终止/中止:
Woke up after 4 sec.
Woke up after 8 sec.
Woke up after 5 sec.
我希望日志中应该只有一行,因为其他进程必须在它们有机会完成之前中止:
Woke up after 4 sec.
有没有办法实现我想要的?
我什至不确定这里是否需要asyncio
,也许concurrent.futures
也可以使用。无论哪种方式,任务都是相同的 - 终止任务,这需要太多时间才能完成。
您可以使用 Process.terminate:
try:
yield from proc.wait()
except asyncio.CancelledError:
proc.terminate()
raise
或者:
try:
yield from proc.wait()
finally:
if proc.returncode is None:
proc.terminate()
编辑
Why I didn't see asyncio.CancelledError raised in my code?
当asyncio.wait_for
(或其他任何东西)取消任务时,它会在相应的协程中抛出一个CancelledError
。这允许协程在必要时执行一些清理(例如使用上下文管理器或 try/finally
子句)。不需要记录此错误,因为它是已取消任务的正常行为。但在取消任务后尝试等待任务,将引发 CancelledError
。
我有一个必须执行某些 shell 命令的脚本。但是,如果命令需要很长时间才能完成,则必须强行将其杀死。考虑以下代码片段:
import asyncio, random
q = asyncio.Queue()
MAX_WAIT = 5
@asyncio.coroutine
def blocking_task(sec):
print('This task will sleep {} sec.'.format(sec))
create = asyncio.create_subprocess_shell(
'sleep {s}; echo "Woke up after {s} sec." >> ./tst.log'.format(s=sec),
stdout=asyncio.subprocess.PIPE)
proc = yield from create
yield from proc.wait()
@asyncio.coroutine
def produce():
while True:
q.put_nowait(random.randint(3,8))
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
try:
yield from asyncio.wait_for(blocking_task(value), MAX_WAIT)
except asyncio.TimeoutError:
print('~/~ Job has been cancelled !!')
else:
print('=/= Job has been done :]')
loop = asyncio.get_event_loop()
asyncio.ensure_future(produce())
asyncio.ensure_future(consume())
loop.run_forever()
此代码产生以下输出:
This task will sleep 4 sec.
=/= Job has been done :]
This task will sleep 8 sec.
~/~ Job has been cancelled !!
This task will sleep 5 sec.
~/~ Job has been cancelled !!
看来它正在按预期工作,如果作业完成时间太长,就会停止。但是,如果我检查日志,我可以确认无论多么耗时的任务仍在继续 运行 并且实际上并未停止/终止/中止:
Woke up after 4 sec.
Woke up after 8 sec.
Woke up after 5 sec.
我希望日志中应该只有一行,因为其他进程必须在它们有机会完成之前中止:
Woke up after 4 sec.
有没有办法实现我想要的?
我什至不确定这里是否需要asyncio
,也许concurrent.futures
也可以使用。无论哪种方式,任务都是相同的 - 终止任务,这需要太多时间才能完成。
您可以使用 Process.terminate:
try:
yield from proc.wait()
except asyncio.CancelledError:
proc.terminate()
raise
或者:
try:
yield from proc.wait()
finally:
if proc.returncode is None:
proc.terminate()
编辑
Why I didn't see asyncio.CancelledError raised in my code?
当asyncio.wait_for
(或其他任何东西)取消任务时,它会在相应的协程中抛出一个CancelledError
。这允许协程在必要时执行一些清理(例如使用上下文管理器或 try/finally
子句)。不需要记录此错误,因为它是已取消任务的正常行为。但在取消任务后尝试等待任务,将引发 CancelledError
。