如何使用 asyncio 和 aiohttp 创建 Python3 异步 Web 请求?
How to create Python3 Asynchronous Web Requests with asyncio & aiohttp?
我正在努力思考 asyncio 库。我认为您可以简单地定义您想要 运行 异步的代码部分,但在我看到的所有示例中,人们倾向于将其主要功能定义为异步。这是我编写的代码:
async def download_post(id, path):
print(f"Begin downloading {id}")
async with aiohttp.ClientSession() as session:
async with session.get(f"{apiurl}item/{id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {id}")
if json["type"] == "story":
print(f"Begin writing to {id}.json")
with open(os.path.join(path, f"{id}.json"), "w") as file:
file.write(content)
print(f"Done writing to {id}.json")
async def update_posts(path):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
await download_post(id, path)
def main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
loop = asyncio.get_event_loop()
loop.run_until_complete(update_posts(path))
loop.close()
#domain_counts(path)
if __name__ == '__main__':
main()
这里关键是range(myid+1, maxid):
很大,requests.get()
需要比较长的时间。但是,在尝试使用 asyncio 从请求切换到 aiohttp 之后,我仍然得到一个一个的响应,如下面的输出所示
Begin downloading 1
Done downloading 1
Begin writing to 1.json
Done writing to 1.json
Begin downloading 2
Done downloading 2
Begin writing to 2.json
Done writing to 2.json
Begin downloading 3
Done downloading 3
Begin writing to 3.json
Done writing to 3.json
Begin downloading 4
Done downloading 4
Begin writing to 4.json
Done writing to 4.json
我考虑过将下载和写入文件代码拆分为不同的函数,但我不确定这两个函数是否也必须是异步的。另外,我认为我必须让许多变量以 await
开头。这些是我参考的资源:
- Easy parallel HTTP requests with Python and asyncio
- Intro to aiohttp
- Making Concurrent HTTP requests with Python AsyncIO
有没有人有一些好的资源我可以去看看,以更好地理解我做错了什么?我注意到很多例子都使用 asyncio.gather()
,但我不太明白它是如何使用的。我是否需要将 async
放在每个函数的前面/with
和 await
放在每个变量的前面?
两件重要的事情:
- Python 解释器 GIL,运行s 在 single-thread 上;所以从技术上讲,你并不是真的运行并行处理
- 但要注意的是,大多数 I/O 操作 'hog' 资源,而您的 CPU 在此期间仍处于空闲状态。这就是像
asyncio
这样的库可以为您提供帮助的地方。
- 他们试图确保 最小化 CPU-idle-time,方法是 运行 在您的队列中执行其他任务,同时主要 I/O 操作正在等待其结果
在您的例子中,update_posts()
看起来并不像理想意义上的异步方法;因为此方法在技术上仅用于确定要下载和写入哪些帖子
并且由于我们已经在讨论下载和写入,您会注意到您实际上可以将它们 运行 作为独立任务,以确保停机时间最短。
以下是我可能会采用的方法:
import asyncio
from asyncio import Queue
import aiohttp
import os
async def generate_download_post_tasks(path, queue: Queue):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
queue.put_nowait((id, path))
async def download_post_tasks(download_queue: Queue, write_queue: Queue):
async with aiohttp.ClientSession() as session:
while True:
download_request_id, path = await download_queue.get()
async with session.get(f"{apiurl}item/{download_request_id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {download_request_id}")
if json["type"] == "story":
write_queue.put_nowait((download_request_id, content, path))
async def write_post_tasks(write_queue: Queue):
while True:
post_id, post_content, path = await write_queue.get()
print(f"Begin writing to {post_id}.json")
with open(os.path.join(path, f"{post_id}.json"), "w") as file:
file.write(post_content)
print(f"Done writing to {post_id}.json")
async def async_main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
tasks = set()
download_queue = Queue()
write_queue = Queue()
tasks.add(asyncio.create_task(generate_download_post_tasks(path=path, queue=download_queue)))
tasks.add(asyncio.create_task(download_post_tasks(download_queue=download_queue, write_queue=write_queue)))
tasks.add(asyncio.create_task(write_post_tasks(write_queue=write_queue)))
wait_time = 100
try:
await asyncio.wait_for(asyncio.gather(*tasks), wait_time)
except:
# Catch errors
print("End!!")
if __name__ == '__main__':
asyncio.run(async_main())
我正在努力思考 asyncio 库。我认为您可以简单地定义您想要 运行 异步的代码部分,但在我看到的所有示例中,人们倾向于将其主要功能定义为异步。这是我编写的代码:
async def download_post(id, path):
print(f"Begin downloading {id}")
async with aiohttp.ClientSession() as session:
async with session.get(f"{apiurl}item/{id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {id}")
if json["type"] == "story":
print(f"Begin writing to {id}.json")
with open(os.path.join(path, f"{id}.json"), "w") as file:
file.write(content)
print(f"Done writing to {id}.json")
async def update_posts(path):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
await download_post(id, path)
def main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
loop = asyncio.get_event_loop()
loop.run_until_complete(update_posts(path))
loop.close()
#domain_counts(path)
if __name__ == '__main__':
main()
这里关键是range(myid+1, maxid):
很大,requests.get()
需要比较长的时间。但是,在尝试使用 asyncio 从请求切换到 aiohttp 之后,我仍然得到一个一个的响应,如下面的输出所示
Begin downloading 1
Done downloading 1
Begin writing to 1.json
Done writing to 1.json
Begin downloading 2
Done downloading 2
Begin writing to 2.json
Done writing to 2.json
Begin downloading 3
Done downloading 3
Begin writing to 3.json
Done writing to 3.json
Begin downloading 4
Done downloading 4
Begin writing to 4.json
Done writing to 4.json
我考虑过将下载和写入文件代码拆分为不同的函数,但我不确定这两个函数是否也必须是异步的。另外,我认为我必须让许多变量以 await
开头。这些是我参考的资源:
- Easy parallel HTTP requests with Python and asyncio
- Intro to aiohttp
- Making Concurrent HTTP requests with Python AsyncIO
有没有人有一些好的资源我可以去看看,以更好地理解我做错了什么?我注意到很多例子都使用 asyncio.gather()
,但我不太明白它是如何使用的。我是否需要将 async
放在每个函数的前面/with
和 await
放在每个变量的前面?
两件重要的事情:
- Python 解释器 GIL,运行s 在 single-thread 上;所以从技术上讲,你并不是真的运行并行处理
- 但要注意的是,大多数 I/O 操作 'hog' 资源,而您的 CPU 在此期间仍处于空闲状态。这就是像
asyncio
这样的库可以为您提供帮助的地方。 - 他们试图确保 最小化 CPU-idle-time,方法是 运行 在您的队列中执行其他任务,同时主要 I/O 操作正在等待其结果
在您的例子中,update_posts()
看起来并不像理想意义上的异步方法;因为此方法在技术上仅用于确定要下载和写入哪些帖子
并且由于我们已经在讨论下载和写入,您会注意到您实际上可以将它们 运行 作为独立任务,以确保停机时间最短。
以下是我可能会采用的方法:
import asyncio
from asyncio import Queue
import aiohttp
import os
async def generate_download_post_tasks(path, queue: Queue):
myid = get_myid(path)
if myid < maxid: # Database can be updated
for id in range(myid+1, maxid):
queue.put_nowait((id, path))
async def download_post_tasks(download_queue: Queue, write_queue: Queue):
async with aiohttp.ClientSession() as session:
while True:
download_request_id, path = await download_queue.get()
async with session.get(f"{apiurl}item/{download_request_id}.json?print=pretty") as resp:
json = await resp.json()
content = await resp.text()
print(f"Done downloading {download_request_id}")
if json["type"] == "story":
write_queue.put_nowait((download_request_id, content, path))
async def write_post_tasks(write_queue: Queue):
while True:
post_id, post_content, path = await write_queue.get()
print(f"Begin writing to {post_id}.json")
with open(os.path.join(path, f"{post_id}.json"), "w") as file:
file.write(post_content)
print(f"Done writing to {post_id}.json")
async def async_main():
if not os.path.exists(posts_dir):
os.makedirs(posts_dir)
path = os.path.join(os.getcwd(), posts_dir)
tasks = set()
download_queue = Queue()
write_queue = Queue()
tasks.add(asyncio.create_task(generate_download_post_tasks(path=path, queue=download_queue)))
tasks.add(asyncio.create_task(download_post_tasks(download_queue=download_queue, write_queue=write_queue)))
tasks.add(asyncio.create_task(write_post_tasks(write_queue=write_queue)))
wait_time = 100
try:
await asyncio.wait_for(asyncio.gather(*tasks), wait_time)
except:
# Catch errors
print("End!!")
if __name__ == '__main__':
asyncio.run(async_main())