需要实际值时自动等待协程?

Automatic awaiting on coroutines when actual values are needed?

假设我想使用 asyncio 实现以下内容:

def f(): 
    val1 = a()  # a() takes 1 sec
    val2 = b()  # b() takes 3 sec
    val3 = c(val1, val2)  # c() takes 1 sec, but must wait for a() and b() to finish
    val4 = d(val1)  # d() takes 1 sec, but must wait for a() to finish

所有函数 a、b、c、d 都是异步的,并且可能 运行 并行。 运行 的优化方法是:1) 运行 a() 和 b() 并行。 2) 当 a() 完成后,运行 d()。 3)当a()和b()完成后,运行c()。一切加在一起应该需要 4 秒。

我发现用 asyncio 实现它并不理想:

import time
import asyncio

async def a():
    await asyncio.sleep(1)

async def b():
    await asyncio.sleep(3)

async def c(val1, val2):
    await val2
    await asyncio.sleep(1)

async def d(val1):
    await val1
    await asyncio.sleep(1)

async def f():
    val1 = a()
    val2 = b()
    val3 = c(val1, val2)
    val4 = d(val1)
    return await asyncio.gather(val3, val4)

t1 = time.time()
await f()
t2 = time.time()
print(t2 - t1)  # This will be 4 seconds indeed

以上实现有效,但主要流程是我需要知道 a() 在 b() 之前完成,以便在 d() 中等待 val1 而不是在 c() 中等待它。换句话说,给定一个(可能很复杂的)执行图,我必须知道哪些函数先于其他函数完成,以便将“await”语句放在正确的位置。如果我在两个地方等待同一个协程,我得到一个异常。

我的问题如下:在 asyncio(或其他 python 模块)中是否有一种机制可以在需要将它们解析为实际值时自动等待协程?我知道在其他并行执行机制中也实现了这样的机制。

有很多方法可以做到。一种可能性是使用同步原语,例如 asyncio.Event。例如:

import time
import asyncio


val1 = None
val2 = None

event_a = None
event_b = None


async def a():
    global val1
    await asyncio.sleep(1)  # some computation
    val1 = 1
    event_a.set()


async def b():
    global val2
    await asyncio.sleep(3)
    val2 = 100
    event_b.set()


async def c():
    await event_a.wait()
    await event_b.wait()

    await asyncio.sleep(1)

    return val1 + val2


async def d():
    await event_a.wait()

    await asyncio.sleep(1)

    return val1 * 2


async def f():
    global event_a
    global event_b

    event_a = asyncio.Event()
    event_b = asyncio.Event()

    out = await asyncio.gather(a(), b(), c(), d())
    assert out[2] == 101
    assert out[3] == 2


async def main():
    t1 = time.time()
    await f()
    t2 = time.time()
    print(t2 - t1)


if __name__ == "__main__":
    asyncio.run(main())

打印:

4.0029356479644775

另一种选择是将计算拆分为更多协程,例如:

import time
import asyncio


async def a():
    await asyncio.sleep(1)  # some computation
    return 1


async def b():
    await asyncio.sleep(3)
    return 100


async def c(val1, val2):
    await asyncio.sleep(1)
    return val1 + val2


async def d(val1):
    await asyncio.sleep(1)
    return val1 * 2


async def f():
    async def task1():
        params = await asyncio.gather(a(), b())  # <-- run a() and b() in parallel
        return await c(*params)

    async def task2():
        return await d(await a())

    out = await asyncio.gather(task1(), task2())
    assert out[0] == 101
    assert out[1] == 2


async def main():
    t1 = time.time()
    await f()
    t2 = time.time()
    print(t2 - t1)


if __name__ == "__main__":
    asyncio.run(main())

打印:

4.00294041633606