aiohttp - Splitting task while getting large number of HTML pages - RuntimeError: cannot reuse already awaited coroutine

aiohttp - Splitting task while getting large number of HTML pages - RuntimeError: cannot reuse already awaited coroutine

我有 URL 个链接列表,我使用以下代码获取并保存到 HTML 个文件:

    tasksURL = []
    async with aiohttp.ClientSession() as session:
        for url in listOfURLs:
            tasksURL.append(self.fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)
    i = 0
    for html in allHTMLs:
        i += 1
        with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
            f.write(html)

由于 URL 列表可能非常大(最多 60 000)我需要将这些任务分块。

我尝试了以下解决方案。我已经定义了一个函数,它将使用这个函数将列表分成更小的块:

def chunkList(self, listOfURLs, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

然后使用此函数 运行 每个分块的 listOfURLs 像这样:

tasksURL = []
chunkedListOfURLs = self.chunkList(listOfURLs, 5)
for URLList in chunkedListOfURLs:
    async with aiohttp.ClientSession() as session:
        for url in URLList:
            tasksURL.append(self.fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)
    for html in allHTMLs:
        with open("myPath.html", mode='w', encoding='UTF-8', errors='strict', buffering=1) as f:
            f.write(html)

我遇到错误:

RuntimeError: cannot reuse already awaited coroutine

我理解问题,但还没有找到解决方法。

我建议在这种情况下使用 asyncio.Queue。您不想为每个 URL 创建 60k 任务。当您使用队列时,您可以生成一定数量的工作人员并限制队列大小:

If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then await put() blocks when the queue reaches maxsize until an item is removed by get().

import asyncio
import random

WORKERS = 10


async def worker(q):
    while True:
        url = await q.get()
        t = random.uniform(1, 5)

        print(f"START: {url} ({t:.2f}s)")
        await asyncio.sleep(t)
        print(f"END: {url}")

        q.task_done()


async def main():
    q = asyncio.Queue(maxsize=100)

    tasks = []

    for _ in range(WORKERS):
        tasks.append(asyncio.create_task(worker(q)))

    for i in range(10):
        await q.put(f"http://example.com/{i}")

    await q.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    main = asyncio.run(main())

测试:

$ python test.py
START: http://example.com/0 (1.14s)
START: http://example.com/1 (4.40s)
START: http://example.com/2 (2.48s)
START: http://example.com/3 (4.34s)
START: http://example.com/4 (1.94s)
END: http://example.com/0
START: http://example.com/5 (1.52s)
END: http://example.com/4
START: http://example.com/6 (4.84s)
END: http://example.com/2
START: http://example.com/7 (4.35s)
END: http://example.com/5
START: http://example.com/8 (2.33s)
END: http://example.com/3
START: http://example.com/9 (1.80s)
END: http://example.com/1
END: http://example.com/8
END: http://example.com/9
END: http://example.com/6
END: http://example.com/7

顺便说一句,写入文件会阻塞您的主事件循环,要么在 run_in_executor or use aiofiles 中调用它。

4 月 3 日星期六更新 13:49:55 UTC 2021:

示例:

import asyncio
import traceback

import aiohttp

WORKERS = 5
URLS = [
    "http://airbnb.com",
    "http://amazon.co.uk",
    "http://amazon.com",
    "http://baidu.com",
    "http://basecamp.com",
    "http://bing.com",
    "http://djangoproject.com",
    "http://envato.com",
    "http://facebook.com",
    "http://github.com",
    "http://gmail.com",
    "http://google.co.uk",
    "http://google.com",
    "http://google.es",
    "http://google.fr",
    "http://heroku.com",
    "http://instagram.com",
    "http://linkedin.com",
    "http://live.com",
    "http://netflix.com",
    "http://rubyonrails.org",
    "http://shopify.com",
    "http://whosebug.com",
    "http://trello.com",
    "http://wordpress.com",
    "http://yahoo.com",
    "http://yandex.ru",
    "http://yiiframework.com",
    "http://youtube.com",
]


class Bot:
    async def fetch(self, client, url):
        async with client.get(url) as r:
            return await r.text()

    async def worker(self, q, client):
        loop = asyncio.get_running_loop()

        while True:
            url = await q.get()

            try:
                html = await self.fetch(client, url)
            except Exception:
                traceback.print_exc()
            else:
                await loop.run_in_executor(None, self.save_to_disk, url, html)
            finally:
                q.task_done()

    def save_to_disk(self, url, html):
        print(f"{url} ({len(html)})")


async def main():
    q = asyncio.Queue(maxsize=100)
    tasks = []

    async with aiohttp.ClientSession() as client:
        bot = Bot()

        for _ in range(WORKERS):
            tasks.append(asyncio.create_task(bot.worker(q, client)))

        for url in URLS:
            await q.put(url)

        await q.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    main = asyncio.run(main())

在您的示例中,您的 tasksURL 数组在您成功处理的每个块之后都会有一组等待的协程。然后,您在后续迭代中将新的协程附加到该列表,当您转到 gather 时,您正在尝试等待完整的协程以及新的、未等待的协程。只需为每个块创建一个新的 tasksURL 列表即可解决您的问题:

for URLList in chunkedListOfURLs:
    tasksURL = []
    async with aiohttp.ClientSession() as session:
        for url in URLList:
            tasksURL.append(fetch(session, url))
        allHTMLs = await asyncio.gather(*tasksURL)

请注意,默认情况下,aiohttp 的客户端会话允许 100 个并发连接。有关更多详细信息,请参阅 https://docs.aiohttp.org/en/stable/client_advanced.html#limiting-connection-pool-size,因此您无需分块即可获得一些开箱即用的并发限制。信号量和队列也是根据您在其他答案中提到的要求来限制并发的其他选项。