为什么需要 create_task() 来使用 asyncio gather 创建协程队列?

Why is create_task() needed to create a queue of coroutines using asyncio gather?

我在事件循环中有以下代码 运行ning,我使用 asyncio 下载大量文件并使用 asyncio.queue 限制下载的文件数量:

download_tasks = asyncio.Queue()
for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(asyncio.create_task(download_file(file=file))

async def worker():
    while not download_tasks.empty():
        return await download_tasks.get_nowait() 

worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)  

这段代码似乎 运行 没问题,但我最初将 for 循环定义为:

for file in files:
    # download_file() is an async function that downloads a file from Microsoft blob storage
    # that is basically await blob.download_blob()
    download_tasks.put_nowait(download_file(file=file)

使用此代码,结果相同,但我收到以下警告:

RuntimeWarning: coroutine 'download_file' was never awaited

查看 asyncio 示例,有时我会看到 create_task() 在创建协程列表或队列时使用 运行 收集,有时我不会。为什么在我的案例中需要它,使用它的最佳做法是什么?

编辑:正如@user2357112supportsMonica 不客气地指出的那样,worker() 中的 return 语句没有任何意义。这段代码的要点是限制并发,因为我可能必须一次下载数千个,并且想使用队列将它一次限制为 10 个。所以我的实际问题是,如何使用 gather return 使用此队列实现的所有结果?

编辑 2:我似乎找到了一个简单的解决方案,它使用信号量而不是队列,下面的代码改编自这个答案 :

download_tasks = []
for file in files:
    download_tasks.append(download_file(file=file))

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)
    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))
    
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)  

正如“user2357112 supports Monica”所指出的那样,最初的问题可能来自拥有 return 的工作人员,因此每个工作人员将下载一个文件然后退出,这意味着前 10 个之后的任何协程都将被忽略并且永远不会等待(您可能会看到,如果您在假定的处理完成后记录有关 download_tasks 的信息)。

create_tasks 失败了,因为它会立即同时安排下载(击败尝试的速率限制/工人池),然后不正确的工人代码将忽略前 10 个项目之后的任何内容。

无论如何,协程(例如裸 async 函数)和任务之间的区别在于任务 是独立调度的 。也就是说,一旦你创建了一个任务,它就会独立地生活,如果你不想要它的结果,你不必 await 它。这类似于 Javascript 的异步函数。

协同程序,但是,在等待它们之前不要做任何事情,它们只有在被明确轮询并且只能通过等待它们来完成(直接或间接地,例如 gatherwait 将 await/poll 它们包装的对象)。