使用 Asyncio 和 Aiohttp 下载数千张图片

Download thousands of images with Asyncio and Aiohttp

我一直在尝试在我的本地文件系统中下载数千张图片,但它没有正常工作,因为当我有一个名为 asyncio.exceptions.TimeoutError 的异常时下载了大约 5,000 张按目录分隔的图像。

我第一次执行下一个脚本时获得了 16.000 次下载,但每次执行它时,它都会减少下载的图像数量,目前我大约有 5,000 张图像。

这是我实现的脚本:

import os
import asyncio
import aiofiles
import async_timeout

from aiohttp import ClientSession
from generator import generate_hash
from logger import logger
from typing import List, Dict, Any

async def download_file(session: Any, remote_url: str, filename: str) -> None:
    try:
        async with async_timeout.timeout(120):
            async with session.get(remote_url) as response:
                if response.status == 200:
                    async with aiofiles.open(filename, mode='wb') as f:
                        async for data in response.content.iter_chunked(1024):
                            await f.write(data)
                else:
                    logger.error(f"Error to get {filename} from Remote Server")
    except asyncio.TimeoutError:
        logger.error(f"Timeout error to download {filename} into Local Server")
        raise
    
async def download_files(images: List[Dict[str, Any]], path: str) -> None:
    headers = {"user-agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"}
    async with ClientSession(headers=headers) as session:
        tasks = [asyncio.ensure_future(download_file(session, image['resource'], get_filename(image, path))) for image in images]
        await asyncio.gather(*tasks)
        
def download_images(images: List[Dict[str, Any]], path: str) -> None:
    try:
        loop = asyncio.get_event_loop()
        future = asyncio.ensure_future(download_files(images, path))
        loop.run_until_complete(future)
        logger.info(f'Images from Remote Server have been downloaded successfully')
    except Exception as error:
        logger.error(f'Error to download images from Remote Server: {error}')
        raise

def get_filename(image: Dict[str, Any], path: str) -> str:
    image_dir = '{}/{}'.format(path, image['id'])
    image_file = '{}.jpg'.format(generate_hash(image['resource']))
    if not os.path.exists(image_dir):
        os.makedirs(image_dir)
    return os.path.join(image_dir, image_file)


def main():
    images = [
                 {
                     'id': '10755431', 
                     'resource': 'http://image1.jpg'
                 }, 
                 {
                     'id': '10755432',
                     'resource': 'http://image2.jpg'
                 }, 
                 {
                     'id': '101426201',
                     'recurso': 'http://image3.jpg'
                 }
             ]
    IMAGES_PATH = '/home/stivenramireza'
    download_images(images, IMAGES_PATH)

if __name__ == "__main__":
    main()

我收到这个错误:

ERROR:root:Timeout error to download /home/stivenramireza/10755431/664e3bdd10cd69452774f38ec822a9eb.jpg into Local Server
ERROR:root:Error to download images from Remote Server: 
Traceback (most recent call last):
  File "/home/stivenramireza/storage/main.py", line 17, in download_file
    async for data in response.content.iter_chunked(1024):
  File "/home/stivenramireza/.local/lib/python3.8/site-packages/aiohttp/streams.py", line 39, in __anext__
    rv = await self.read_func()
  File "/home/stivenramireza/.local/lib/python3.8/site-packages/aiohttp/streams.py", line 368, in read
    await self._wait('read')
  File "/home/stivenramireza/.local/lib/python3.8/site-packages/aiohttp/streams.py", line 296, in _wait
    await waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "main.py", line 70, in <module>
    main()
  File "main.py", line 67, in main
    download_images(images, IMAGES_PATH)
  File "/home/stivenramireza/storage/main.py", line 34, in download_images
    loop.run_until_complete(future)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/stivenramireza/storage/main.py", line 28, in download_files
    await asyncio.gather(*[asyncio.ensure_future(download_file(session, image['recurso'], get_filename(image, path))) for image in images])
  File "/home/stivenramireza/storage/main.py", line 20, in download_file
    logger.error(f"Error to get {filename} from Re Server")
  File "/home/stivenramireza/.local/lib/python3.8/site-packages/async_timeout/__init__.py", line 55, in __aexit__
    self._do_exit(exc_type)
  File "/home/stivenramireza/.local/lib/python3.8/site-packages/async_timeout/__init__.py", line 92, in _do_exit
    raise asyncio.TimeoutError
asyncio.exceptions.TimeoutError

我该怎么办?

提前致谢。

您的 download_file 函数捕获超时错误 并重新引发它 。您的 download_files 函数使用 asyncio.gather() ,它在第一个异常时退出并将其传播给调用者。可以合理地假设,当下载大量文件时,其中一个文件迟早会超时,在这种情况下,您的整个程序都会中断。

What should I do?

这取决于您希望程序在超时情况下执行的操作。例如,您可能想要重试该文件,或者您可能想要放弃。但是您很可能不想因为单个文件超时而中断整个下载。

虽然在很多情况下重新引发您捕捉到的异常是正确的做法,但在这里不是正确的做法。您可以将 download_file 末尾的 raise 更改为 return (remote_url, filename),这将导致 gather() 返回失败下载列表,您可以尝试重新下载。