如何使用 asyncio 同时 运行 无限循环?

How to concurrently run a infinite loop with asyncio?

等待时如何继续下一个循环?例如:

async def get_message():
    # async get message from queue
    return message

async process_message(message):
    # make some changes on message
    return message

async def deal_with_message(message):
    # async update some network resource with given message

async def main():
    while True:
        message = await get_message()
        message = await process_message(message)
        await deal_with_message(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

如何使 while True 循环并发?如果正在等待deal_with_message,可以进入下一个循环运行get_message?

已编辑

我想我找到了解决办法:

async def main():
    asyncio.ensure_future(main())
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)

loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
loop.run_forever()

最简单的解决方案是 asyncio.ensure_future

async def main():
    tasks = []
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        task = asyncio.ensure_future(coroutine) # starts running coroutine
        tasks.append(task)
    await asyncio.wait(tasks)

如果您的所有任务都可以在最后等待,您可以选择自己跟踪任务。

async def main():
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        asyncio.ensure_future(coroutine)
    tasks = asyncio.Task.all_tasks()
    await asyncio.wait(tasks)

您的解决方案会奏效,但我发现它有问题。

async def main():
    asyncio.ensure_future(main())
    # task finishing

一旦 main 开始,它就会创建新任务,并且会立即发生(ensure_future 会立即创建任务),这与实际完成该任务需要时间不同。我想这可能会导致创建大量任务,从而耗尽您的 RAM。

除此之外,这意味着潜在的任何大量任务都可以 运行 同时进行。它会耗尽您的网络吞吐量或可以同时打开的套接字数量(想象一下您要并行下载 1 000 000 个 url - 不会发生任何好事)。

在并发世界中,这个问题通常 can be solved by limiting amount of things that can be ran concurrently with some sensible value using something like Semaphore。但是,在您的情况下,我认为手动跟踪 运行 任务数量并手动填充它更方便:

import asyncio
from random import randint


async def get_message():
    message = randint(0, 1_000)
    print(f'{message} got')
    return message


async def process_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} processed')
    return message


async def deal_with_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'{message} dealt')


async def utilize_message():
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)


parallel_max = 5  # don't utilize more than 5 msgs parallely
parallel_now = 0


def populate_tasks():
    global parallel_now
    for _ in range(parallel_max - parallel_now):
        parallel_now += 1
        task = asyncio.ensure_future(utilize_message())
        task.add_done_callback(on_utilized)


def on_utilized(_):
    global parallel_now
    parallel_now -= 1
    populate_tasks()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        populate_tasks()
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出如下:

939 got
816 got
737 got
257 got
528 got
939 processed
816 processed
528 processed
816 dealt
589 got
939 dealt
528 dealt
712 got
263 got
737 processed
257 processed
263 processed
712 processed
263 dealt
712 dealt
386 got
708 got
589 processed
257 dealt
386 processed
708 processed
711 got
711 processed

这里的重要部分是我们如何在 运行 任务数量减少到少于五个后才使用下一条消息。

更新:

是的,如果您不需要动态更改最大 运行 数字,信号量似乎更方便。

sem = asyncio.Semaphore(5)


async def main():
    async with sem:
        asyncio.ensure_future(main())
        await utilize_message()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        asyncio.ensure_future(main())
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()