如何使用 asyncio 和 aiohttp 创建 Python3 异步 Web 请求?

How to create Python3 Asynchronous Web Requests with asyncio & aiohttp?

我正在努力思考 asyncio 库。我认为您可以简单地定义您想要 运行 异步的代码部分,但在我看到的所有示例中,人们倾向于将其主要功能定义为异步。这是我编写的代码:

async def download_post(id, path):
    print(f"Begin downloading {id}")
    async with aiohttp.ClientSession() as session:
        async with session.get(f"{apiurl}item/{id}.json?print=pretty") as resp:
            json = await resp.json()
            content = await resp.text()
            print(f"Done downloading {id}")
            if json["type"] == "story":
                print(f"Begin writing to {id}.json")
                with open(os.path.join(path, f"{id}.json"), "w") as file:
                    file.write(content)
                    print(f"Done writing to {id}.json")

async def update_posts(path):
    myid = get_myid(path)
    if myid < maxid: # Database can be updated
        for id in range(myid+1, maxid):
            await download_post(id, path)
def main():
    if not os.path.exists(posts_dir):
        os.makedirs(posts_dir)
    path = os.path.join(os.getcwd(), posts_dir)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(update_posts(path))
    loop.close()
    #domain_counts(path)

if __name__ ==  '__main__':
    main()

这里关键是range(myid+1, maxid):很大,requests.get()需要比较长的时间。但是,在尝试使用 asyncio 从请求切换到 aiohttp 之后,我仍然得到一个一个的响应,如下面的输出所示

Begin downloading 1
Done downloading 1
Begin writing to 1.json
Done writing to 1.json
Begin downloading 2
Done downloading 2
Begin writing to 2.json
Done writing to 2.json
Begin downloading 3
Done downloading 3
Begin writing to 3.json
Done writing to 3.json
Begin downloading 4
Done downloading 4
Begin writing to 4.json
Done writing to 4.json

我考虑过将下载和写入文件代码拆分为不同的函数,但我不确定这两个函数是否也必须是异步的。另外,我认为我必须让许多变量以 await 开头。这些是我参考的资源:

  1. Easy parallel HTTP requests with Python and asyncio
  2. Intro to aiohttp
  3. Making Concurrent HTTP requests with Python AsyncIO

有没有人有一些好的资源我可以去看看,以更好地理解我做错了什么?我注意到很多例子都使用 asyncio.gather(),但我不太明白它是如何使用的。我是否需要将 async 放在每个函数的前面/withawait 放在每个变量的前面?

两件重要的事情:

  • Python 解释器 GIL,运行s 在 single-thread 上;所以从技术上讲,你并不是真的运行并行处理
  • 但要注意的是,大多数 I/O 操作 'hog' 资源,而您的 CPU 在此期间仍处于空闲状态。这就是像 asyncio 这样的库可以为您提供帮助的地方。
  • 他们试图确保 最小化 CPU-idle-time,方法是 运行 在您的队列中执行其他任务,同时主要 I/O 操作正在等待其结果

在您的例子中,update_posts() 看起来并不像理想意义上的异步方法;因为此方法在技术上仅用于确定要下载和写入哪些帖子

并且由于我们已经在讨论下载和写入,您会注意到您实际上可以将它们 运行 作为独立任务,以确保停机时间最短。

以下是我可能会采用的方法:

import asyncio
from asyncio import Queue
import aiohttp
import os


async def generate_download_post_tasks(path, queue: Queue):
    myid = get_myid(path)
    if myid < maxid: # Database can be updated
        for id in range(myid+1, maxid):
            queue.put_nowait((id, path))


async def download_post_tasks(download_queue: Queue, write_queue: Queue):
    async with aiohttp.ClientSession() as session:
        while True:
            download_request_id, path = await download_queue.get()
            async with session.get(f"{apiurl}item/{download_request_id}.json?print=pretty") as resp:
                json = await resp.json()
                content = await resp.text()
                print(f"Done downloading {download_request_id}")
                if json["type"] == "story":
                    write_queue.put_nowait((download_request_id, content, path))


async def write_post_tasks(write_queue: Queue):
    while True:
        post_id, post_content, path = await write_queue.get()
        print(f"Begin writing to {post_id}.json")
        with open(os.path.join(path, f"{post_id}.json"), "w") as file:
            file.write(post_content)
            print(f"Done writing to {post_id}.json")


async def async_main():
    if not os.path.exists(posts_dir):
        os.makedirs(posts_dir)
    path = os.path.join(os.getcwd(), posts_dir)

    tasks = set()
    download_queue = Queue()
    write_queue = Queue()
    tasks.add(asyncio.create_task(generate_download_post_tasks(path=path, queue=download_queue)))
    tasks.add(asyncio.create_task(download_post_tasks(download_queue=download_queue, write_queue=write_queue)))
    tasks.add(asyncio.create_task(write_post_tasks(write_queue=write_queue)))

    wait_time = 100

    try:
        await asyncio.wait_for(asyncio.gather(*tasks), wait_time)
    except:
        # Catch errors
        print("End!!")
        

if __name__ == '__main__':
    asyncio.run(async_main())