使用信号量不工作限制并发 AsyncIO 任务的数量
Limiting number of concurrent AsyncIO tasks using Semaphore not working
Objective:
我正在尝试同时抓取多个网址。我不想同时发出太多请求,所以我使用 来限制它。
问题:
正在请求所有任务,而不是一次请求有限的任务。
精简代码:
async def download_all_product_information():
# TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
# FUNCTION TO ACTUALLY DOWNLOAD INFO
async def get_product_information(url_to_append):
url = 'https://www.amazon.com.br' + url_to_append
print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
category_index) + '/' + str(len(all_categories)) + ' in ' + gender)
source = await get_source_code_or_content(url, should_render_javascript=True)
time.sleep(random.uniform(2, 5))
return source
# LOOP WHERE STUFF GETS DONE
for current_page_number in range(1, 401):
for gender in os.listdir(base_folder):
all_tasks = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_task = asyncio.create_task(get_product_information(product_specific_url))
all_tasks.append(current_task)
await gather_with_concurrency(random.randrange(8, 15), *all_tasks)
async def main():
await download_all_product_information()
# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running(): # only patch if needed (i.e. running in Notebook, Spyder, etc)
import nest_asyncio
nest_asyncio.apply()
# for asynchronous functionality
if __name__ == '__main__':
asyncio.run(main())
我做错了什么?谢谢!
这一行有什么问题:
current_task = asyncio.create_task(get_product_information(product_specific_url))
当您创建“任务”时,它会立即安排执行。尽快
当您的代码执行 asyncio 循环(在任何“等待”表达式)时,asyncio 将循环执行您的所有任务。
在您指出的原始片段中,信号量保护任务本身的创建,确保一次只有“n”个任务处于活动状态。在该片段中传递给 gather_with_concurrency
的是协程。
与任务不同,协同例程是准备等待但尚未安排的对象。它们可以免费传递,就像任何其他对象一样 - 它们只会在等待或被任务包装时执行(然后当代码将控制权传递给异步循环时)。
在您的代码中,您正在使用 get_product_information
调用创建协同例程,并立即将其包装在任务中。在调用gather_with_concurrency
本身的行中的await
指令中,它们一次都是运行。
解决方法很简单:此时不要创建任务,只需在信号量保护的代码中创建即可。仅将原始协程添加到您的列表中:
...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_coroutine = get_product_information(product_specific_url)
all_coroutines.append(current_coroutine)
await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)
这段代码中仍然存在一个不相关的错误,它会导致并发失败:您正在 gather_product_information
中对 time.sleep
进行同步调用。这将在此时停止异步循环
直到睡眠结束。正确的做法是使用 await asyncio.sleep(...)
.
Objective:
我正在尝试同时抓取多个网址。我不想同时发出太多请求,所以我使用
问题:
正在请求所有任务,而不是一次请求有限的任务。
精简代码:
async def download_all_product_information():
# TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
# FUNCTION TO ACTUALLY DOWNLOAD INFO
async def get_product_information(url_to_append):
url = 'https://www.amazon.com.br' + url_to_append
print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
category_index) + '/' + str(len(all_categories)) + ' in ' + gender)
source = await get_source_code_or_content(url, should_render_javascript=True)
time.sleep(random.uniform(2, 5))
return source
# LOOP WHERE STUFF GETS DONE
for current_page_number in range(1, 401):
for gender in os.listdir(base_folder):
all_tasks = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_task = asyncio.create_task(get_product_information(product_specific_url))
all_tasks.append(current_task)
await gather_with_concurrency(random.randrange(8, 15), *all_tasks)
async def main():
await download_all_product_information()
# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running(): # only patch if needed (i.e. running in Notebook, Spyder, etc)
import nest_asyncio
nest_asyncio.apply()
# for asynchronous functionality
if __name__ == '__main__':
asyncio.run(main())
我做错了什么?谢谢!
这一行有什么问题:
current_task = asyncio.create_task(get_product_information(product_specific_url))
当您创建“任务”时,它会立即安排执行。尽快 当您的代码执行 asyncio 循环(在任何“等待”表达式)时,asyncio 将循环执行您的所有任务。
在您指出的原始片段中,信号量保护任务本身的创建,确保一次只有“n”个任务处于活动状态。在该片段中传递给 gather_with_concurrency
的是协程。
与任务不同,协同例程是准备等待但尚未安排的对象。它们可以免费传递,就像任何其他对象一样 - 它们只会在等待或被任务包装时执行(然后当代码将控制权传递给异步循环时)。
在您的代码中,您正在使用 get_product_information
调用创建协同例程,并立即将其包装在任务中。在调用gather_with_concurrency
本身的行中的await
指令中,它们一次都是运行。
解决方法很简单:此时不要创建任务,只需在信号量保护的代码中创建即可。仅将原始协程添加到您的列表中:
...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_coroutine = get_product_information(product_specific_url)
all_coroutines.append(current_coroutine)
await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)
这段代码中仍然存在一个不相关的错误,它会导致并发失败:您正在 gather_product_information
中对 time.sleep
进行同步调用。这将在此时停止异步循环
直到睡眠结束。正确的做法是使用 await asyncio.sleep(...)
.