使用信号量不工作限制并发 AsyncIO 任务的数量

Limiting number of concurrent AsyncIO tasks using Semaphore not working

Objective:

我正在尝试同时抓取多个网址。我不想同时发出太多请求,所以我使用 来限制它。

问题:

正在请求所有任务,而不是一次请求有限的任务。

精简代码:

async def download_all_product_information():
    # TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
    async def gather_with_concurrency(n, *tasks):
        semaphore = asyncio.Semaphore(n)

        async def sem_task(task):
            async with semaphore:
                return await task

        return await asyncio.gather(*(sem_task(task) for task in tasks))

    # FUNCTION TO ACTUALLY DOWNLOAD INFO
    async def get_product_information(url_to_append):
        url = 'https://www.amazon.com.br' + url_to_append

        print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
            category_index) + '/' + str(len(all_categories)) + ' in ' + gender)

        source = await get_source_code_or_content(url, should_render_javascript=True)
        time.sleep(random.uniform(2, 5))

        return source

    # LOOP WHERE STUFF GETS DONE
    for current_page_number in range(1, 401):
        for gender in os.listdir(base_folder):
                all_tasks = []

                # check all products in the current page
                all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
                for product_specific_url in all_products_in_current_page:
                    current_task = asyncio.create_task(get_product_information(product_specific_url))

                    all_tasks.append(current_task)

                await gather_with_concurrency(random.randrange(8, 15), *all_tasks)

async def main():
    await download_all_product_information()

# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running():  # only patch if needed (i.e. running in Notebook, Spyder, etc)
    import nest_asyncio

    nest_asyncio.apply()

# for asynchronous functionality
if __name__ == '__main__':
    asyncio.run(main())

我做错了什么?谢谢!

这一行有什么问题:

current_task = asyncio.create_task(get_product_information(product_specific_url))

当您创建“任务”时,它会立即安排执行。尽快 当您的代码执行 asyncio 循环(在任何“等待”表达式)时,asyncio 将循环执行您的所有任务。

在您指出的原始片段中,信号量保护任务本身的创建,确保一次只有“n”个任务处于活动状态。在该片段中传递给 gather_with_concurrency 的是协程。

与任务不同,协同例程是准备等待但尚未安排的对象。它们可以免费传递,就像任何其他对象一样 - 它们只会在等待或被任务包装时执行(然后当代码将控制权传递给异步循环时)。

在您的代码中,您正在使用 get_product_information 调用创建协同例程,并立即将其包装在任务中。在调用gather_with_concurrency本身的行中的await指令中,它们一次都是运行。

解决方法很简单:此时不要创建任务,只需在信号量保护的代码中创建即可。仅将原始协程添加到您的列表中:

...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
     current_coroutine = get_product_information(product_specific_url)

     all_coroutines.append(current_coroutine)

     await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)

这段代码中仍然存在一个不相关的错误,它会导致并发失败:您正在 gather_product_information 中对 time.sleep 进行同步调用。这将在此时停止异步循环 直到睡眠结束。正确的做法是使用 await asyncio.sleep(...) .