python asyncio add_done_callback 与 async def
python asyncio add_done_callback with async def
我有 2 个函数:第一个 def_a
是一个异步函数,第二个是 def_b
,它是一个常规函数,调用结果为 def_a
作为 add_done_callback
函数的回调。
我的代码如下所示:
import asyncio
def def_b(result):
next_number = result.result()
# some work on the next_number
print(next_number + 1)
async def def_a(number):
await some_async_work(number)
return number + 1
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(def_a(1))
task.add_done_callback(def_b)
response = loop.run_until_complete(task)
loop.close()
而且效果很好。
当第二个函数 def_b
也变成异步时,问题就开始了。现在看起来像这样:
async def def_b(result):
next_number = result.result()
# some asynchronous work on the next_number
print(next_number + 1)
但现在我无法将它提供给 add_done_callback
函数,因为它不是常规函数。
我的问题是 - 如果 def_b
是异步的,我可以向 add_done_callback
函数提供 def_b
吗?
add_done_callback
被认为是“低级”接口。使用协程时,您可以通过多种方式chain them,例如:
import asyncio
async def my_callback(result):
print("my_callback got:", result)
return "My return value is ignored"
async def coro(number):
await asyncio.sleep(number)
return number + 1
async def add_success_callback(fut, callback):
result = await fut
await callback(result)
return result
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coro(1))
task = add_success_callback(task, my_callback)
response = loop.run_until_complete(task)
print("response:", response)
loop.close()
请记住,如果您的未来引发异常,add_done_callback
仍会调用回调(但调用 result.result()
会引发异常)。
这只适用于一个未来的工作,如果你有多个异步工作,它们会互相阻塞,更好的方法是使用 asyncio.as_completed() 来迭代未来的列表:
import asyncio
async def __after_done_callback(future_result):
# await for something...
pass
async def __future_job(number):
await some_async_work(number)
return number + 1
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(__future_job(x)) for x in range(100)] # create 100 future jobs
for f in asyncio.as_completed(tasks):
result = await f
await __after_done_callback(result)
loop.close()
你可以试试aiodag库。它是 asyncio 的一个非常轻量级的包装器,它抽象出一些您通常必须考虑的异步管道。从这个例子中你将无法判断事物是 运行 异步的,因为它只是 1 个任务依赖于另一个,但它都是 运行 异步的。
import asyncio
from aiodag import task
@task
async def def_b(result):
# some asynchronous work on the next_number
print(result + 1)
@task
async def def_a(number):
await asyncio.sleep(number)
return number + 1
async def main():
a = def_a(1)
b = def_b(a) # this makes task b depend on task a
return await b
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
response = loop.run_until_complete(main())
我有 2 个函数:第一个 def_a
是一个异步函数,第二个是 def_b
,它是一个常规函数,调用结果为 def_a
作为 add_done_callback
函数的回调。
我的代码如下所示:
import asyncio
def def_b(result):
next_number = result.result()
# some work on the next_number
print(next_number + 1)
async def def_a(number):
await some_async_work(number)
return number + 1
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(def_a(1))
task.add_done_callback(def_b)
response = loop.run_until_complete(task)
loop.close()
而且效果很好。
当第二个函数 def_b
也变成异步时,问题就开始了。现在看起来像这样:
async def def_b(result):
next_number = result.result()
# some asynchronous work on the next_number
print(next_number + 1)
但现在我无法将它提供给 add_done_callback
函数,因为它不是常规函数。
我的问题是 - 如果 def_b
是异步的,我可以向 add_done_callback
函数提供 def_b
吗?
add_done_callback
被认为是“低级”接口。使用协程时,您可以通过多种方式chain them,例如:
import asyncio
async def my_callback(result):
print("my_callback got:", result)
return "My return value is ignored"
async def coro(number):
await asyncio.sleep(number)
return number + 1
async def add_success_callback(fut, callback):
result = await fut
await callback(result)
return result
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coro(1))
task = add_success_callback(task, my_callback)
response = loop.run_until_complete(task)
print("response:", response)
loop.close()
请记住,如果您的未来引发异常,add_done_callback
仍会调用回调(但调用 result.result()
会引发异常)。
这只适用于一个未来的工作,如果你有多个异步工作,它们会互相阻塞,更好的方法是使用 asyncio.as_completed() 来迭代未来的列表:
import asyncio
async def __after_done_callback(future_result):
# await for something...
pass
async def __future_job(number):
await some_async_work(number)
return number + 1
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(__future_job(x)) for x in range(100)] # create 100 future jobs
for f in asyncio.as_completed(tasks):
result = await f
await __after_done_callback(result)
loop.close()
你可以试试aiodag库。它是 asyncio 的一个非常轻量级的包装器,它抽象出一些您通常必须考虑的异步管道。从这个例子中你将无法判断事物是 运行 异步的,因为它只是 1 个任务依赖于另一个,但它都是 运行 异步的。
import asyncio
from aiodag import task
@task
async def def_b(result):
# some asynchronous work on the next_number
print(result + 1)
@task
async def def_a(number):
await asyncio.sleep(number)
return number + 1
async def main():
a = def_a(1)
b = def_b(a) # this makes task b depend on task a
return await b
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
response = loop.run_until_complete(main())