处理在 python3 中永不终止的任务 asyncio

Handling tasks that never terminate in python3 asyncio

有时异步任务没有有意义的终止条件 - 例如,在下面的程序中,"rate_limiter" 任务在队列中以固定速率永远生成令牌流。

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)

main()

这个程序运行完美 除了 asyncio 库认为它是一个编程错误,当没有什么可以限制速率时直接丢弃 rltask;你收到类似

的投诉
...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
      wait_for=<Future pending cb=[Task._wakeup()]>>

(是否处于调试模式)。

我可以解决这个问题,例如,告诉 rate_limiter 协同程序跳出其循环的事件,但这感觉像是没有实际好处的额外代码。使用 asyncio 时,您应该如何处理这种情况?

编辑: 我不清楚:我正在寻找的是线程上的 daemon 标志之类的东西:使它成为现实的东西所以我 不必等待特定任务, 理想地表示为任务本身或其协同程序的注释。我也会接受一个证明没有这种机制的答案。我已经知道解决方法。

.cancel() 任务然后等待它被取消,在外面捕获 CancelledError:

# vim: tabstop=4 expandtab

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)
    rltask.cancel()
    try:
        loop.run_until_complete(rltask)
    except asyncio.CancelledError:
        ...
    loop.close()

main()

为了避免 "Task was destroyed but it is pending!" 警告,如果您为相应的未来对象设置虚拟结果,则可以在退出程序时将永不结束的协程标记为已完成:

#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager


@contextmanager
def finishing(coro_or_future, *, loop=None):
    """Mark a never ending coroutine or future as done on __exit__."""
    fut = asyncio.ensure_future(
        coro_or_future, loop=loop)  # start infinite loop
    try:
        yield
    finally:
        if not fut.cancelled():
            fut.set_result(None)  # mark as finished


async def never_ends():
    for c in itertools.cycle('\|/-'):
        print(c, end='\r', flush=True)
        await asyncio.sleep(.3)


with closing(asyncio.get_event_loop()) as loop, \
     finishing(never_ends(), loop=loop):
    loop.run_until_complete(asyncio.sleep(3))  # do something else

它假定您的协程在进程退出之前不需要显式清理。在后一种情况下,为清理定义一个显式过程:提供可以调用的方法(例如,server.close()server.wait_closed()),或传递一个调用者应该调用的事件(asyncio.Event)在关闭时设置,或引发异常(例如 CancelledError)。

引入 finishing() 的好处是检测错误,即您不应忽略警告,除非它被 finishing() 调用明确静音。