自定义异步执行器

Custom asyncio executor

我需要使用具有以下条件(行为)的 asyncio 实现算法:

  1. 检查参数列表是否为空,如果为空结束执行
  2. 从参数列表中弹出下一个参数创建协程
  3. 这个参数并安排它“同时”执行不能
  4. 协程完成后执行的协程不超过 'async_level' 个
  5. 执行 -> 转到步骤 1

有必要不要计划一次完成所有任务(如asyncio.gather),而是分部分进行。当下一个任务完成执行时,一个新的任务将取代它。

我试着用 asyncio.as_completed() 来做,但实际上并没有按预期工作:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

tasks = {asyncio.create_task(job(param)) for param in params[0: async_level]}
params = iter(params[async_level:])

while True:
    # NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
    for task in asyncio.as_completed(tasks):
        print(f"len(tasks) = {len(tasks)}")
        await task

        try:
            param = next(params)
            tasks.add(asyncio.create_task(job(param)))
        except StopIteration:
            print("StopIteration")

    break

此外,我尝试使用 asyncio.BoundedSemaphore 来实现它,但不满足前两个条件:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

async def semaphore_job(name, _asyncio_semaphore):
    async with _asyncio_semaphore:
        await job(name)

asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
    jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)

如有任何帮助,我将不胜感激。

看来我自己找到了解决办法:

import asyncio
from typing import Callable
from random import randrange
from asyncio import Semaphore, ensure_future, get_event_loop


async def job(name, time_range=10):
    timeout = randrange(time_range)
    print(f"Task '{name}' started with timeout {timeout}")
    await asyncio.sleep(timeout)
    print(f"Task '{name}' finished")
    return name


async def custom_executor(func: Callable, args: list, async_level: int = 4):
    """ Asynchronously executes no more that 'async_level' callables specified by 'func' with corresponding 'args' """
    loop = get_event_loop()
    sync = Semaphore()
    todo = set(args)
    doing = set()

    def _schedule_task():
        if todo:
            arg = todo.pop()
            fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
            f = ensure_future(fr, loop=loop)
            f.add_done_callback(_on_completion)
            doing.add(f)

    def _on_completion(f):
        doing.remove(f)
        sync.release()
        _schedule_task()

    for _ in range(min(async_level, len(todo))):
        _schedule_task()

    while True:
        if not doing:
            break

        await sync.acquire()


async def main():
    await custom_executor(job, [(1, 3), 7, (8, 2), 12, 5])

if __name__ == '__main__':
    asyncio.run(main())

但是如果你知道更好的方法,请分享!

您可以创建固定数量的工作人员并使用队列为他们分配任务。它有点短,我发现它比使用回调的代码更容易推理。但是YMMV.

async def custom_executor(func, args, async_level=4):
    queue = asyncio.Queue(1)
    async def worker():
        while True:
            arg = await queue.get()
            fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
            await fr
            queue.task_done()

    # create the workers
    workers = [asyncio.create_task(worker()) for _ in range(async_level)]

    # Feed the workers tasks. Since the queue is bounded, this will also
    # wait for previous tasks to finish, similar to what you wanted to
    # achieve with as_completed().
    for x in args:
        await queue.put(x)
    await queue.join()  # wait for the remaining tasks to finish

    # cancel the now-idle workers
    for w in workers:
        w.cancel()