如何在运行时向 asyncio 添加任务?

How to add tasks to asyncio at runtime?

基本上,我的目标是收集数据并在运行时发送它,我想异步进行 - 每条消息一个协程。我试图为此构建一个小示例,但它并没有超出我的预期。我假设我需要两个无限循环和一个队列让它们相互通信 - 一个循环获取用户输入,另一个循环监听队列并在队列有内容时输出数据。

这是我的例子:

import asyncio


async def output(queue):
    while True:
        data = await queue.get()
        if data:
            await asyncio.sleep(5)
            print(data)

async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.create_task(output(queue))
    while True:
        data = await loop.run_in_executor(None, input)
        await queue.put(data)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()

预期结果是这样的(>>> 表示输入,<<< 表示输出):

>>> 1
--- 0.0001 seconds delay before I enter my next input 
>>> 2
--- 5 seconds delay
<<< 1
--- ~0.0001 seconds delay
<<< 2

但实际结果是这样的:

>>> 1
--- 0.0001 seconds delay before I enter my next input
>>> 2
--- 5 seconds delay
<<< 1
--- 5 seconds delay
<<< 2

如何实现?

你说你想要“每条消息一个协程”。所以就这样做 - 然后你就不需要队列了。我在您的打印语句中添加了一些时间戳,因此您可以看到两个 5 秒的等待实际上是 运行 并行进行的。我还将 main() 函数中的代码替换为对便捷函数 asyncio.run().

的简单调用
import time
import asyncio

async def output(data):
    await asyncio.sleep(5)
    print("<<<", data, time.ctime())

async def main():
    loop = asyncio.get_event_loop()
    while True:
        data = await loop.run_in_executor(None, input)
        print(">>>", data, time.ctime())
        asyncio.create_task(output(data))

if __name__ == "__main__":
    asyncio.run(main())
    
>>> 1 Tue Oct  5 21:44:18 2021
>>> 2 Tue Oct  5 21:44:18 2021
<<< 1 Tue Oct  5 21:44:23 2021
<<< 2 Tue Oct  5 21:44:23 2021