Python aiohttp 模块:不明确的 .content 属性

Python aiohttp module: ambiguous .content attribute

这是一小段代码:

import aiohttp
import aiofiles

async def fetch(url):
    # starting a session
    async with aiohttp.ClientSession() as session:
        # starting a get request
        async with session.get(url) as response:
            # getting response content
            content = await response.content
            return content
 
async def save_file(file_name, content):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
      while True:
            chunk = content.read(1024)
            if not chunk:
                break
            f.write(chunk)

我正在尝试使用 aiohttp 库下载一些二进制文件,然后使用 aiofiles 库将它们传递给协程以将文件写入磁盘。 我已经阅读了 documentation but still couldn't figure out if I can pass content = await response.content or is it closed when the handle async with.. is closed? Because on a secondary blog,我发现:

According to aiohttp’s documentation, because the response object was created in a context manager, it technically calls release() implicitly.

这让我很困惑,我应该将第二个函数的逻辑嵌入到 response 句柄中还是我的逻辑正确?

异步上下文管理器将关闭与请求相关的资源,因此如果您从函数中 return,您必须确保已阅读所有感兴趣的内容。所以你有两个选择:

  1. 将整个响应读入内存,例如使用 content = await response.read() 或者,如果文件不适合内存(并且如果您想通过并行读写来加快速度)
  2. 使用队列或异步迭代器并行读取和写入。

这是 #2 的未经测试的实现:

async def fetch(url):
    # return an async generator over contents of URL
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # getting response content in chunks no larger than 4K
            for chunk in response.content.iter_chunked(4096):
                yield chunk
 
async def save_file(file_name, content_iter):
    async with aiofiles.open(f'./binary/{file_name}', 'wb') as f:
        for chunk in content_iter:
            f.write(chunk)  # maybe you need to await this?

async def main():
    save_file(file_name, fetch(url))

感谢 user4815162342 的代码,我可以通过并行化获取和写入协程找到解决方案。我会检查他的代码作为可接受的解决方案,但由于我必须添加一些代码才能使其工作,这里是:

# fetch binary from server
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for chunk in response.content.iter_chunked(4096):
                yield chunk

# write binary function
async def save_file(file_name, chunk_iter):
    list(map(create_dir_tree, list_binary_sub_dirs))
    async with aiofiles.open(f'./binary/bin_ts/{file_name}', 'wb') as f:
        async for chunk in chunk_iter:
            await f.write(chunk)
    

async def main(urls):
    tasks = []
    for url in urls:
        print('running on sublist')
        file_name = url.rpartition('/')[-1]
        request_ts = fetch(url)
        tasks.append(save_file(file_name, request_ts))
    await asyncio.gather(*tasks)

asyncio.run(main(some_list_of_urls))