asyncio 在超时时终止子进程

asyncio terminate subprocess on timeout

我有一个必须执行某些 shell 命令的脚本。但是,如果命令需要很长时间才能完成,则必须强行将其杀死。考虑以下代码片段:

import asyncio, random

q = asyncio.Queue()

MAX_WAIT = 5

@asyncio.coroutine
def blocking_task(sec):
    print('This task will sleep {} sec.'.format(sec))
    create = asyncio.create_subprocess_shell(
       'sleep {s}; echo "Woke up after {s} sec." >> ./tst.log'.format(s=sec),
        stdout=asyncio.subprocess.PIPE)
    proc = yield from create
    yield from proc.wait()

@asyncio.coroutine
def produce():
    while True:
        q.put_nowait(random.randint(3,8))
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        try:
            yield from asyncio.wait_for(blocking_task(value), MAX_WAIT)
        except asyncio.TimeoutError:
            print('~/~ Job has been cancelled !!')
        else:
            print('=/= Job has been done :]')


loop = asyncio.get_event_loop()
asyncio.ensure_future(produce())
asyncio.ensure_future(consume())
loop.run_forever()

此代码产生以下输出:

This task will sleep 4 sec.
=/= Job has been done :]
This task will sleep 8 sec.
~/~ Job has been cancelled !!
This task will sleep 5 sec.
~/~ Job has been cancelled !!

看来它正在按预期工作,如果作业完成时间太长,就会停止。但是,如果我检查日志,我可以确认无论多么耗时的任务仍在继续 运行 并且实际上并未停止/终止/中止:

Woke up after 4 sec.
Woke up after 8 sec.
Woke up after 5 sec.

我希望日志中应该只有一行,因为其他进程必须在它们有机会完成之前中止:

Woke up after 4 sec.

有没有办法实现我想要的?

我什至不确定这里是否需要asyncio,也许concurrent.futures也可以使用。无论哪种方式,任务都是相同的 - 终止任务,这需要太多时间才能完成。

您可以使用 Process.terminate:

try:
    yield from proc.wait()
except asyncio.CancelledError:
    proc.terminate()
    raise

或者:

try:
    yield from proc.wait()
finally:
    if proc.returncode is None:
        proc.terminate()

编辑

Why I didn't see asyncio.CancelledError raised in my code?

asyncio.wait_for(或其他任何东西)取消任务时,它会在相应的协程中抛出一个CancelledError。这允许协程在必要时执行一些清理(例如使用上下文管理器或 try/finally 子句)。不需要记录此错误,因为它是已取消任务的正常行为。但在取消任务后尝试等待任务,将引发 CancelledError