处理在 python3 中永不终止的任务 asyncio
Handling tasks that never terminate in python3 asyncio
有时异步任务没有有意义的终止条件 - 例如,在下面的程序中,"rate_limiter" 任务在队列中以固定速率永远生成令牌流。
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
main()
这个程序运行完美 除了 asyncio 库认为它是一个编程错误,当没有什么可以限制速率时直接丢弃 rltask
;你收到类似
的投诉
...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
wait_for=<Future pending cb=[Task._wakeup()]>>
(是否处于调试模式)。
我可以解决这个问题,例如,告诉 rate_limiter
协同程序跳出其循环的事件,但这感觉像是没有实际好处的额外代码。使用 asyncio 时,您应该如何处理这种情况?
编辑: 我不清楚:我正在寻找的是线程上的 daemon
标志之类的东西:使它成为现实的东西所以我 不必等待特定任务, 理想地表示为任务本身或其协同程序的注释。我也会接受一个证明没有这种机制的答案。我已经知道解决方法。
.cancel()
任务然后等待它被取消,在外面捕获 CancelledError
:
# vim: tabstop=4 expandtab
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
rltask.cancel()
try:
loop.run_until_complete(rltask)
except asyncio.CancelledError:
...
loop.close()
main()
为了避免 "Task was destroyed but it is pending!" 警告,如果您为相应的未来对象设置虚拟结果,则可以在退出程序时将永不结束的协程标记为已完成:
#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager
@contextmanager
def finishing(coro_or_future, *, loop=None):
"""Mark a never ending coroutine or future as done on __exit__."""
fut = asyncio.ensure_future(
coro_or_future, loop=loop) # start infinite loop
try:
yield
finally:
if not fut.cancelled():
fut.set_result(None) # mark as finished
async def never_ends():
for c in itertools.cycle('\|/-'):
print(c, end='\r', flush=True)
await asyncio.sleep(.3)
with closing(asyncio.get_event_loop()) as loop, \
finishing(never_ends(), loop=loop):
loop.run_until_complete(asyncio.sleep(3)) # do something else
它假定您的协程在进程退出之前不需要显式清理。在后一种情况下,为清理定义一个显式过程:提供可以调用的方法(例如,server.close()
、server.wait_closed()
),或传递一个调用者应该调用的事件(asyncio.Event
)在关闭时设置,或引发异常(例如 CancelledError
)。
引入 finishing()
的好处是检测错误,即您不应忽略警告,除非它被 finishing()
调用明确静音。
有时异步任务没有有意义的终止条件 - 例如,在下面的程序中,"rate_limiter" 任务在队列中以固定速率永远生成令牌流。
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
main()
这个程序运行完美 除了 asyncio 库认为它是一个编程错误,当没有什么可以限制速率时直接丢弃 rltask
;你收到类似
...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
wait_for=<Future pending cb=[Task._wakeup()]>>
(是否处于调试模式)。
我可以解决这个问题,例如,告诉 rate_limiter
协同程序跳出其循环的事件,但这感觉像是没有实际好处的额外代码。使用 asyncio 时,您应该如何处理这种情况?
编辑: 我不清楚:我正在寻找的是线程上的 daemon
标志之类的东西:使它成为现实的东西所以我 不必等待特定任务, 理想地表示为任务本身或其协同程序的注释。我也会接受一个证明没有这种机制的答案。我已经知道解决方法。
.cancel()
任务然后等待它被取消,在外面捕获 CancelledError
:
# vim: tabstop=4 expandtab
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
rltask.cancel()
try:
loop.run_until_complete(rltask)
except asyncio.CancelledError:
...
loop.close()
main()
为了避免 "Task was destroyed but it is pending!" 警告,如果您为相应的未来对象设置虚拟结果,则可以在退出程序时将永不结束的协程标记为已完成:
#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager
@contextmanager
def finishing(coro_or_future, *, loop=None):
"""Mark a never ending coroutine or future as done on __exit__."""
fut = asyncio.ensure_future(
coro_or_future, loop=loop) # start infinite loop
try:
yield
finally:
if not fut.cancelled():
fut.set_result(None) # mark as finished
async def never_ends():
for c in itertools.cycle('\|/-'):
print(c, end='\r', flush=True)
await asyncio.sleep(.3)
with closing(asyncio.get_event_loop()) as loop, \
finishing(never_ends(), loop=loop):
loop.run_until_complete(asyncio.sleep(3)) # do something else
它假定您的协程在进程退出之前不需要显式清理。在后一种情况下,为清理定义一个显式过程:提供可以调用的方法(例如,server.close()
、server.wait_closed()
),或传递一个调用者应该调用的事件(asyncio.Event
)在关闭时设置,或引发异常(例如 CancelledError
)。
引入 finishing()
的好处是检测错误,即您不应忽略警告,除非它被 finishing()
调用明确静音。