asyncio 任务已被销毁,但它正在等待
asyncio task was destroyed but it is pending
我正在开发一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。
但是因为 csv 非常大,出于测试目的,我想在几块之后中断阅读。
不幸的是出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
示例代码为:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: {j}")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send({k:v}))
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
问题很简单。您提前退出循环,但异步生成器尚未耗尽(待处理):
...
if i > 5:
break
...
许多 async
资源,例如生成器,需要在事件循环的帮助下清理。当 async for
循环停止通过 break
迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务挂起(等待事件循环)但被销毁(由垃圾收集器)。
最直接的修复方法是 aclose
生成器明确:
async def main():
i = 0
aiter = readChunks() # name iterator in order to ...
try:
async for chunk in aiter:
...
i += 1
if i > 5:
break
finally:
await aiter.aclose() # ... clean it up when done
这些模式可以使用 asyncstdlib
(disclaimer: I maintain this library). asyncstdlib.islice
进行简化,允许在完全关闭生成器之前获取固定数量的项目:
import asyncstdlib as a
async def main():
async for chunk in a.islice(readChunks(), 5):
...
如果 break
条件是动态的,scoping the iterator 保证在任何情况下都能清除:
import asyncstdlib as a
async def main():
async with a.scoped_iter(readChunks()) as aiter:
async for idx, chunk in a.enumerate(aiter):
...
if idx >= 5:
break
你的 readChunks
在异步和你的循环中是 运行。如果没有完成程序,你就是在破坏它。
这就是为什么它给出 asyncio task was destroyed but it is pending
简而言之,异步任务在后台执行它的工作,但你通过打破循环(停止程序)杀死了它。
这有效...
import asyncio
import json
import logging
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.002)
root.info('readChunks: next chunk coming')
yield chunk
async def send(row):
j = json.dumps(row)
root.info(f"to be sent: {j}")
await asyncio.sleep(0.002)
async def main():
i = 0
root.info('main: starting to read chunks')
async for chunk in readChunks():
for k, v in chunk.items():
root.info(f'main: sending an item')
#await asyncio.gather(send({k:v}))
stuff = await send({k:v})
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()
if __name__ == '__main__':
asyncio.run(main())
...至少它运行并完成了。
bugs.python.org/issue38013 中描述了通过退出 async for
循环来停止异步生成器的问题,看起来它已在 3.7.5 中修复。
但是,使用
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
我收到调试错误,但在 Python 3.8.
中没有异常
Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>
使用更高级别 API asyncio.run(main())
with debugging ON 我 没有 得到调试信息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run()
.
我正在开发一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。
但是因为 csv 非常大,出于测试目的,我想在几块之后中断阅读。 不幸的是出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
示例代码为:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: {j}")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send({k:v}))
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
问题很简单。您提前退出循环,但异步生成器尚未耗尽(待处理):
...
if i > 5:
break
...
许多 async
资源,例如生成器,需要在事件循环的帮助下清理。当 async for
循环停止通过 break
迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务挂起(等待事件循环)但被销毁(由垃圾收集器)。
最直接的修复方法是 aclose
生成器明确:
async def main():
i = 0
aiter = readChunks() # name iterator in order to ...
try:
async for chunk in aiter:
...
i += 1
if i > 5:
break
finally:
await aiter.aclose() # ... clean it up when done
这些模式可以使用 asyncstdlib
(disclaimer: I maintain this library). asyncstdlib.islice
进行简化,允许在完全关闭生成器之前获取固定数量的项目:
import asyncstdlib as a
async def main():
async for chunk in a.islice(readChunks(), 5):
...
如果 break
条件是动态的,scoping the iterator 保证在任何情况下都能清除:
import asyncstdlib as a
async def main():
async with a.scoped_iter(readChunks()) as aiter:
async for idx, chunk in a.enumerate(aiter):
...
if idx >= 5:
break
你的 readChunks
在异步和你的循环中是 运行。如果没有完成程序,你就是在破坏它。
这就是为什么它给出 asyncio task was destroyed but it is pending
简而言之,异步任务在后台执行它的工作,但你通过打破循环(停止程序)杀死了它。
这有效...
import asyncio
import json
import logging
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.002)
root.info('readChunks: next chunk coming')
yield chunk
async def send(row):
j = json.dumps(row)
root.info(f"to be sent: {j}")
await asyncio.sleep(0.002)
async def main():
i = 0
root.info('main: starting to read chunks')
async for chunk in readChunks():
for k, v in chunk.items():
root.info(f'main: sending an item')
#await asyncio.gather(send({k:v}))
stuff = await send({k:v})
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()
if __name__ == '__main__':
asyncio.run(main())
...至少它运行并完成了。
bugs.python.org/issue38013 中描述了通过退出 async for
循环来停止异步生成器的问题,看起来它已在 3.7.5 中修复。
但是,使用
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
我收到调试错误,但在 Python 3.8.
中没有异常Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>
使用更高级别 API asyncio.run(main())
with debugging ON 我 没有 得到调试信息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run()
.