Python async/await 正在下载 url 列表
Python async/await downloading a list of urls
我正在尝试从 FTP 服务器下载 30,000 多个文件,经过谷歌搜索后,使用异步 IO 似乎是个好主意。但是,下面的代码无法下载任何文件并且 returns 超时错误。我真的很感激任何帮助!谢谢!
class pdb:
def __init__(self):
self.ids = []
self.dl_id = []
self.err_id = []
async def download_file(self, session, url):
try:
with async_timeout.timeout(10):
async with session.get(url) as remotefile:
if remotefile.status == 200:
data = await remotefile.read()
return {"error": "", "data": data}
else:
return {"error": remotefile.status, "data": ""}
except Exception as e:
return {"error": e, "data": ""}
async def unzip(self, session, work_queue):
while not work_queue.empty():
queue_url = await work_queue.get()
print(queue_url)
data = await self.download_file(session, queue_url)
id = queue_url[-11:-7]
ID = id.upper()
if not data["error"]:
saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb')
if ID not in self.dl_id:
self.dl_id.append(ID)
with open(f"{id}.ent.gz", 'wb') as f:
f.write(data["data"].read())
with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
shutil.copyfileobj(inFile, outFile)
os.remove(f"{id}.ent.gz")
else:
self.err_id.append(ID)
def download_queue(self, urls):
loop = asyncio.get_event_loop()
q = asyncio.Queue(loop=loop)
[q.put_nowait(url) for url in urls]
con = aiohttp.TCPConnector(limit=10)
with aiohttp.ClientSession(loop=loop, connector=con) as session:
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
删除 try
部分时的错误消息:
Traceback (most recent call last):
File "test.py", line 111, in
x.download_queue(urls)
File "test.py", line 99, in download_queue
loop.run_until_complete(asyncio.gather(*tasks))
File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "test.py", line 73, in unzip
data = await self.download_file(session, queue_url)
File "test.py", line 65, in download_file
return {"error": remotefile.status, "data": ""}
File "/home/yz/miniconda3/lib/python3.6/site- packages/async_timeout/init.py", line 46, in exit
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))
在这里,您开始同时下载所有网址。这意味着您也开始为所有这些计算超时。一旦它是一个很大的数字,例如 30,000,由于 networks/ram/cpu 容量,它无法在 10 秒内完成。
为避免这种情况,您应该保证限制同时启动的协程。通常可以使用像 asyncio.Semaphore 这样的同步原语来实现这一点。
看起来像这样:
sem = asyncio.Semaphore(10)
# ...
async def download_file(self, session, url):
try:
async with sem: # Don't start next download until 10 other currently running
with async_timeout.timeout(10):
作为@MikhailGerasimov 信号量方法的替代方法,您可以考虑使用 aiostream.stream.map 运算符:
from aiostream import stream, pipe
async def main(urls):
async with aiohttp.ClientSession() as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
zs = stream.map(ys, process)
await zs
这是使用管道的等效实现:
async def main3(urls):
async with aiohttp.ClientSession() as session:
await (stream.repeat(session)
| pipe.zip(stream.iterate(urls))
| pipe.starmap(fetch, ordered=False, task_limit=10)
| pipe.map(process))
您可以使用以下协程对其进行测试:
async def fetch(session, url):
await asyncio.sleep(random.random())
return url
async def process(data):
print(data)
在此 demonstration and the documentation 中查看更多 aiostream 示例。
免责声明:我是项目维护者。
我正在尝试从 FTP 服务器下载 30,000 多个文件,经过谷歌搜索后,使用异步 IO 似乎是个好主意。但是,下面的代码无法下载任何文件并且 returns 超时错误。我真的很感激任何帮助!谢谢!
class pdb:
def __init__(self):
self.ids = []
self.dl_id = []
self.err_id = []
async def download_file(self, session, url):
try:
with async_timeout.timeout(10):
async with session.get(url) as remotefile:
if remotefile.status == 200:
data = await remotefile.read()
return {"error": "", "data": data}
else:
return {"error": remotefile.status, "data": ""}
except Exception as e:
return {"error": e, "data": ""}
async def unzip(self, session, work_queue):
while not work_queue.empty():
queue_url = await work_queue.get()
print(queue_url)
data = await self.download_file(session, queue_url)
id = queue_url[-11:-7]
ID = id.upper()
if not data["error"]:
saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb')
if ID not in self.dl_id:
self.dl_id.append(ID)
with open(f"{id}.ent.gz", 'wb') as f:
f.write(data["data"].read())
with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
shutil.copyfileobj(inFile, outFile)
os.remove(f"{id}.ent.gz")
else:
self.err_id.append(ID)
def download_queue(self, urls):
loop = asyncio.get_event_loop()
q = asyncio.Queue(loop=loop)
[q.put_nowait(url) for url in urls]
con = aiohttp.TCPConnector(limit=10)
with aiohttp.ClientSession(loop=loop, connector=con) as session:
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
删除 try
部分时的错误消息:
Traceback (most recent call last):
File "test.py", line 111, in
x.download_queue(urls)
File "test.py", line 99, in download_queue
loop.run_until_complete(asyncio.gather(*tasks))
File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "test.py", line 73, in unzip
data = await self.download_file(session, queue_url)
File "test.py", line 65, in download_file
return {"error": remotefile.status, "data": ""}
File "/home/yz/miniconda3/lib/python3.6/site- packages/async_timeout/init.py", line 46, in exit
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))
在这里,您开始同时下载所有网址。这意味着您也开始为所有这些计算超时。一旦它是一个很大的数字,例如 30,000,由于 networks/ram/cpu 容量,它无法在 10 秒内完成。
为避免这种情况,您应该保证限制同时启动的协程。通常可以使用像 asyncio.Semaphore 这样的同步原语来实现这一点。
看起来像这样:
sem = asyncio.Semaphore(10)
# ...
async def download_file(self, session, url):
try:
async with sem: # Don't start next download until 10 other currently running
with async_timeout.timeout(10):
作为@MikhailGerasimov 信号量方法的替代方法,您可以考虑使用 aiostream.stream.map 运算符:
from aiostream import stream, pipe
async def main(urls):
async with aiohttp.ClientSession() as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
zs = stream.map(ys, process)
await zs
这是使用管道的等效实现:
async def main3(urls):
async with aiohttp.ClientSession() as session:
await (stream.repeat(session)
| pipe.zip(stream.iterate(urls))
| pipe.starmap(fetch, ordered=False, task_limit=10)
| pipe.map(process))
您可以使用以下协程对其进行测试:
async def fetch(session, url):
await asyncio.sleep(random.random())
return url
async def process(data):
print(data)
在此 demonstration and the documentation 中查看更多 aiostream 示例。
免责声明:我是项目维护者。