asyncio 任务已被销毁,但它正在等待

asyncio task was destroyed but it is pending

我正在开发一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。

但是因为 csv 非常大,出于测试目的,我想在几块之后中断阅读。 不幸的是出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

示例代码为:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

问题很简单。您提前退出循环,但异步生成器尚未耗尽(待处理):

...
if i > 5:
    break
...

许多 async 资源,例如生成器,需要在事件循环的帮助下清理。当 async for 循环停止通过 break 迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务挂起(等待事件循环)但被销毁(由垃圾收集器)。

最直接的修复方法是 aclose 生成器明确:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done

这些模式可以使用 asyncstdlib (disclaimer: I maintain this library). asyncstdlib.islice 进行简化,允许在完全关闭生成器之前获取固定数量的项目:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...

如果 break 条件是动态的,scoping the iterator 保证在任何情况下都能清除:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break

你的 readChunks 在异步和你的循环中是 运行。如果没有完成程序,你就是在破坏它。

这就是为什么它给出 asyncio task was destroyed but it is pending

简而言之,异步任务在后台执行它的工作,但你通过打破循环(停止程序)杀死了它。

这有效...

import asyncio
import json
import logging

logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
                    datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.002)
    root.info('readChunks: next chunk coming')
    yield chunk

async def send(row):
    j = json.dumps(row)
    root.info(f"to be sent: {j}")
    await asyncio.sleep(0.002)


async def main():
    i = 0
    root.info('main: starting to read chunks')
    async for chunk in readChunks():
        for k, v in chunk.items():
            root.info(f'main: sending an item')
            #await asyncio.gather(send({k:v}))
            stuff = await send({k:v})
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")

##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()

if __name__ == '__main__':

    asyncio.run(main())

...至少它运行并完成了。


bugs.python.org/issue38013 中描述了通过退出 async for 循环来停止异步生成器的问题,看起来它已在 3.7.5 中修复。

但是,使用

loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()

我收到调试错误,但在 Python 3.8.

中没有异常
Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>

使用更高级别 API asyncio.run(main()) with debugging ON 没有 得到调试信息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run().