为什么需要 create_task() 来使用 asyncio gather 创建协程队列?
Why is create_task() needed to create a queue of coroutines using asyncio gather?
我在事件循环中有以下代码 运行ning,我使用 asyncio
下载大量文件并使用 asyncio.queue
限制下载的文件数量:
download_tasks = asyncio.Queue()
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(asyncio.create_task(download_file(file=file))
async def worker():
while not download_tasks.empty():
return await download_tasks.get_nowait()
worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)
这段代码似乎 运行 没问题,但我最初将 for 循环定义为:
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(download_file(file=file)
使用此代码,结果相同,但我收到以下警告:
RuntimeWarning: coroutine 'download_file' was never awaited
查看 asyncio 示例,有时我会看到 create_task()
在创建协程列表或队列时使用 运行 收集,有时我不会。为什么在我的案例中需要它,使用它的最佳做法是什么?
编辑:正如@user2357112supportsMonica 不客气地指出的那样,worker()
中的 return
语句没有任何意义。这段代码的要点是限制并发,因为我可能必须一次下载数千个,并且想使用队列将它一次限制为 10 个。所以我的实际问题是,如何使用 gather return 使用此队列实现的所有结果?
编辑 2:我似乎找到了一个简单的解决方案,它使用信号量而不是队列,下面的代码改编自这个答案 :
download_tasks = []
for file in files:
download_tasks.append(download_file(file=file))
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)
正如“user2357112 supports Monica”所指出的那样,最初的问题可能来自拥有 return
的工作人员,因此每个工作人员将下载一个文件然后退出,这意味着前 10 个之后的任何协程都将被忽略并且永远不会等待(您可能会看到,如果您在假定的处理完成后记录有关 download_tasks
的信息)。
create_tasks
失败了,因为它会立即同时安排下载(击败尝试的速率限制/工人池),然后不正确的工人代码将忽略前 10 个项目之后的任何内容。
无论如何,协程(例如裸 async
函数)和任务之间的区别在于任务 是独立调度的 。也就是说,一旦你创建了一个任务,它就会独立地生活,如果你不想要它的结果,你不必 await
它。这类似于 Javascript 的异步函数。
协同程序,但是,在等待它们之前不要做任何事情,它们只有在被明确轮询并且只能通过等待它们来完成(直接或间接地,例如 gather
或 wait
将 await/poll 它们包装的对象)。
我在事件循环中有以下代码 运行ning,我使用 asyncio
下载大量文件并使用 asyncio.queue
限制下载的文件数量:
download_tasks = asyncio.Queue()
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(asyncio.create_task(download_file(file=file))
async def worker():
while not download_tasks.empty():
return await download_tasks.get_nowait()
worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)
这段代码似乎 运行 没问题,但我最初将 for 循环定义为:
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(download_file(file=file)
使用此代码,结果相同,但我收到以下警告:
RuntimeWarning: coroutine 'download_file' was never awaited
查看 asyncio 示例,有时我会看到 create_task()
在创建协程列表或队列时使用 运行 收集,有时我不会。为什么在我的案例中需要它,使用它的最佳做法是什么?
编辑:正如@user2357112supportsMonica 不客气地指出的那样,worker()
中的 return
语句没有任何意义。这段代码的要点是限制并发,因为我可能必须一次下载数千个,并且想使用队列将它一次限制为 10 个。所以我的实际问题是,如何使用 gather return 使用此队列实现的所有结果?
编辑 2:我似乎找到了一个简单的解决方案,它使用信号量而不是队列,下面的代码改编自这个答案
download_tasks = []
for file in files:
download_tasks.append(download_file(file=file))
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
df_list = await gather_with_concurrency(10, *download_tasks)
return pd.concat(df_list)
正如“user2357112 supports Monica”所指出的那样,最初的问题可能来自拥有 return
的工作人员,因此每个工作人员将下载一个文件然后退出,这意味着前 10 个之后的任何协程都将被忽略并且永远不会等待(您可能会看到,如果您在假定的处理完成后记录有关 download_tasks
的信息)。
create_tasks
失败了,因为它会立即同时安排下载(击败尝试的速率限制/工人池),然后不正确的工人代码将忽略前 10 个项目之后的任何内容。
无论如何,协程(例如裸 async
函数)和任务之间的区别在于任务 是独立调度的 。也就是说,一旦你创建了一个任务,它就会独立地生活,如果你不想要它的结果,你不必 await
它。这类似于 Javascript 的异步函数。
协同程序,但是,在等待它们之前不要做任何事情,它们只有在被明确轮询并且只能通过等待它们来完成(直接或间接地,例如 gather
或 wait
将 await/poll 它们包装的对象)。