需要实际值时自动等待协程?
Automatic awaiting on coroutines when actual values are needed?
假设我想使用 asyncio 实现以下内容:
def f():
val1 = a() # a() takes 1 sec
val2 = b() # b() takes 3 sec
val3 = c(val1, val2) # c() takes 1 sec, but must wait for a() and b() to finish
val4 = d(val1) # d() takes 1 sec, but must wait for a() to finish
所有函数 a、b、c、d 都是异步的,并且可能 运行 并行。 运行 的优化方法是:1) 运行 a() 和 b() 并行。 2) 当 a() 完成后,运行 d()。 3)当a()和b()完成后,运行c()。一切加在一起应该需要 4 秒。
我发现用 asyncio 实现它并不理想:
import time
import asyncio
async def a():
await asyncio.sleep(1)
async def b():
await asyncio.sleep(3)
async def c(val1, val2):
await val2
await asyncio.sleep(1)
async def d(val1):
await val1
await asyncio.sleep(1)
async def f():
val1 = a()
val2 = b()
val3 = c(val1, val2)
val4 = d(val1)
return await asyncio.gather(val3, val4)
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1) # This will be 4 seconds indeed
以上实现有效,但主要流程是我需要知道 a() 在 b() 之前完成,以便在 d() 中等待 val1 而不是在 c() 中等待它。换句话说,给定一个(可能很复杂的)执行图,我必须知道哪些函数先于其他函数完成,以便将“await”语句放在正确的位置。如果我在两个地方等待同一个协程,我得到一个异常。
我的问题如下:在 asyncio(或其他 python 模块)中是否有一种机制可以在需要将它们解析为实际值时自动等待协程?我知道在其他并行执行机制中也实现了这样的机制。
有很多方法可以做到。一种可能性是使用同步原语,例如 asyncio.Event
。例如:
import time
import asyncio
val1 = None
val2 = None
event_a = None
event_b = None
async def a():
global val1
await asyncio.sleep(1) # some computation
val1 = 1
event_a.set()
async def b():
global val2
await asyncio.sleep(3)
val2 = 100
event_b.set()
async def c():
await event_a.wait()
await event_b.wait()
await asyncio.sleep(1)
return val1 + val2
async def d():
await event_a.wait()
await asyncio.sleep(1)
return val1 * 2
async def f():
global event_a
global event_b
event_a = asyncio.Event()
event_b = asyncio.Event()
out = await asyncio.gather(a(), b(), c(), d())
assert out[2] == 101
assert out[3] == 2
async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)
if __name__ == "__main__":
asyncio.run(main())
打印:
4.0029356479644775
另一种选择是将计算拆分为更多协程,例如:
import time
import asyncio
async def a():
await asyncio.sleep(1) # some computation
return 1
async def b():
await asyncio.sleep(3)
return 100
async def c(val1, val2):
await asyncio.sleep(1)
return val1 + val2
async def d(val1):
await asyncio.sleep(1)
return val1 * 2
async def f():
async def task1():
params = await asyncio.gather(a(), b()) # <-- run a() and b() in parallel
return await c(*params)
async def task2():
return await d(await a())
out = await asyncio.gather(task1(), task2())
assert out[0] == 101
assert out[1] == 2
async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)
if __name__ == "__main__":
asyncio.run(main())
打印:
4.00294041633606
假设我想使用 asyncio 实现以下内容:
def f():
val1 = a() # a() takes 1 sec
val2 = b() # b() takes 3 sec
val3 = c(val1, val2) # c() takes 1 sec, but must wait for a() and b() to finish
val4 = d(val1) # d() takes 1 sec, but must wait for a() to finish
所有函数 a、b、c、d 都是异步的,并且可能 运行 并行。 运行 的优化方法是:1) 运行 a() 和 b() 并行。 2) 当 a() 完成后,运行 d()。 3)当a()和b()完成后,运行c()。一切加在一起应该需要 4 秒。
我发现用 asyncio 实现它并不理想:
import time
import asyncio
async def a():
await asyncio.sleep(1)
async def b():
await asyncio.sleep(3)
async def c(val1, val2):
await val2
await asyncio.sleep(1)
async def d(val1):
await val1
await asyncio.sleep(1)
async def f():
val1 = a()
val2 = b()
val3 = c(val1, val2)
val4 = d(val1)
return await asyncio.gather(val3, val4)
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1) # This will be 4 seconds indeed
以上实现有效,但主要流程是我需要知道 a() 在 b() 之前完成,以便在 d() 中等待 val1 而不是在 c() 中等待它。换句话说,给定一个(可能很复杂的)执行图,我必须知道哪些函数先于其他函数完成,以便将“await”语句放在正确的位置。如果我在两个地方等待同一个协程,我得到一个异常。
我的问题如下:在 asyncio(或其他 python 模块)中是否有一种机制可以在需要将它们解析为实际值时自动等待协程?我知道在其他并行执行机制中也实现了这样的机制。
有很多方法可以做到。一种可能性是使用同步原语,例如 asyncio.Event
。例如:
import time
import asyncio
val1 = None
val2 = None
event_a = None
event_b = None
async def a():
global val1
await asyncio.sleep(1) # some computation
val1 = 1
event_a.set()
async def b():
global val2
await asyncio.sleep(3)
val2 = 100
event_b.set()
async def c():
await event_a.wait()
await event_b.wait()
await asyncio.sleep(1)
return val1 + val2
async def d():
await event_a.wait()
await asyncio.sleep(1)
return val1 * 2
async def f():
global event_a
global event_b
event_a = asyncio.Event()
event_b = asyncio.Event()
out = await asyncio.gather(a(), b(), c(), d())
assert out[2] == 101
assert out[3] == 2
async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)
if __name__ == "__main__":
asyncio.run(main())
打印:
4.0029356479644775
另一种选择是将计算拆分为更多协程,例如:
import time
import asyncio
async def a():
await asyncio.sleep(1) # some computation
return 1
async def b():
await asyncio.sleep(3)
return 100
async def c(val1, val2):
await asyncio.sleep(1)
return val1 + val2
async def d(val1):
await asyncio.sleep(1)
return val1 * 2
async def f():
async def task1():
params = await asyncio.gather(a(), b()) # <-- run a() and b() in parallel
return await c(*params)
async def task2():
return await d(await a())
out = await asyncio.gather(task1(), task2())
assert out[0] == 101
assert out[1] == 2
async def main():
t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)
if __name__ == "__main__":
asyncio.run(main())
打印:
4.00294041633606