高效地与请求异步下载文件
Efficiently download files asynchronously with requests
我想尽快下载文件 python.Here 是我的代码
import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 17 # No of threads to spawn
__chunk_size = 1024
__download_time = -1
__errors = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
try:
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return
else:
# No exception whatsoever
for chunk in my_csv:
AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()
def set_user_agent(self, useragent):
self.useragent = useragent
AsyncDownloader.__user_agent = self.useragent
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__connection_timeout = self.timeout_secs
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__read_timeout = self.timeout_secs
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
AsyncDownloader.__download_count = self.file_count
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
AsyncDownloader.__worker_count = self.worker_count
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
AsyncDownloader.__chunk_size = self.chunk_size
def print_urls(self):
print(AsyncDownloader.__urls)
def get_download_time(self):
return AsyncDownloader.__download_time
def get_errors(self):
return AsyncDownloader.__errors
def download(self):
start = timer()
try:
session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
session.headers.update({'user-agent': AsyncDownloader.__user_agent})
session.request(AsyncDownloader.__connection_timeout,
AsyncDownloader.__connection_timeout, stream=True)
results = []
# Give an accurate file count even if we don't have to download it as it a;ready exist
file_count = 0
for url in AsyncDownloader.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if AsyncDownloader.__download_count != 0:
# No need to download file if it already exist
if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
file_count += 1
continue
else:
if file_count < AsyncDownloader.__download_count:
file_count += 1
results.append(session.get(url))
else:
if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
results.append(session.get(url))
for result in results:
# wait for the response to complete, if it hasn't already
response = result.result()
filename = os.path.basename(response.url)
if response.status_code == 200:
with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
if chunk: # filter out keep-alive new chunks
fd.write(chunk)
end = timer()
AsyncDownloader.__download_time = end - start
except requests.exceptions.HTTPError as errh:
AsyncDownloader.__errors.append("Http Error:" + errh)
# print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
AsyncDownloader.__errors.append("Error Connecting:" + errc)
# print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
AsyncDownloader.__errors.append("Timeout Error:" + errt)
# print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
AsyncDownloader.__errors.append("OOps: Something Else" + err)
else:
return
下面的代码非常糟糕 assumption.Indeed 我假设第一个 url 将首先完成,这当然是不正确的。
# wait for the response to complete, if it hasn't already
response = result.result()
我如何确保只处理已完成的请求,而不是像上面那样以有效的方式进行假设?
对于如何提高性能的任何其他建议,我将不胜感激。
亲切的问候
即使连接已按顺序完成,您仍在按顺序处理文件。第二个文件必须等待第一个文件写入,依此类推。所以,你能做的最好的事情就是并行处理所有事情(尽管有 GIL,这也可以完成,因为像写入磁盘和从网络读取这样的 io 操作会释放它)。基本上,使用常规 requests
库(不是 requests-futures
)并为每个请求 + 文件处理创建一个 future/thread。
还有更多方法可以让它更快,比如在写入时继续下载块(即两个线程,一个用于请求,一个用于文件处理)。并通过发出 multi-part
请求并行读取块,这是 "download accelerator" 领域,您可能不希望代码中出现这种复杂性。
编辑:此外,分块下载是惰性的,这意味着您只是并行发出初始请求,但实际的分块文件下载是按顺序进行的,因为它是在主线程中完成的。因此,您当前的方法并不比 完全同步 方法好多少。以上建议仍然有效。
为了使用您的代码,我创建了一个 .csv
文件,其中包含指向多个 robots.txt
文件的链接,这些文件来自多个网站,顺序如下:GitHub, UDemy, YouTube.
经过调试,第一个结果在
response = result.result()
是(按此顺序):UDemy,YouTube,GitHub。
作为记录,每个 robots.txt
的大小都按照我得到结果的相同顺序增加。
这意味着一开始没有问题,尽管我按特定顺序设置了一个 .csv
文件,但结果是按照文件首次下载的顺序出现的。
I would appreciate any other suggestion on how to improve performance.
至于性能,您可以通过创建一个线程来将响应写入文件来提高速度,或者使用异步 IO 库,例如 Tinche/aiofiles.
如果您想更进一步,可以尝试使用 Python 的替代实现来提高程序本身的性能,例如 PyPy
执行此操作的最简单方法不需要任何线程或特殊异步代码:只需使用常规 requests
库及其内置流选项。您说 response = session.get(url, stream=True)
然后使用 response.iter_content(chunk_size=1024)
(例如)一次访问一个块的下载信息。这是一个功能示例:
import requests
import os
def stream_multiple(urls):
responses = {url: requests.get(url, stream=True) for url in urls)
streams = {url: responses[url].iter_content(chunk_size=1024)
for url in urls}
handles = {url: open(os.path.basename(url), 'wb') for url in urls}
while streams:
for url in list(streams.keys()):
try:
chunk = next(streams[url])
print("Received {} bytes for {}".format(len(chunk), url))
handles[url].write(chunk)
except StopIteration: # no more contenet
handles[url].close()
streams.pop(url)
示例输出:
rat@pandion:~/tmp$ python smu.py
Received 1296 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 1882 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1524 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1508 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 1826 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 2349 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1834 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1838 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 2009 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
...
您可能可以使用线程或多处理实现稍微更快的性能,但我怀疑那会好得多。在几乎所有情况下,将数据写入磁盘都比从网络接收数据快得多。
不担心的话可以用gevent
"monkey patch"
import gevent.monkey
import requests
CONNECTIONS = 10
gevent.monkey.patch_all() # debug in PyCharm: https://blog.jetbrains.com/pycharm/2012/08/gevent-debug-support/
import gevent.pool
def your_request_without_any_changes(url):
return requests.get(url)
pool = gevent.pool.Pool(CONNECTIONS)
for response in pool.imap_unordered(your_request_without_any_changes, ['http://www.google.com'] * 100):
print(response.status_code)
gevent
使用 "event loop" 和 patch requests 库(实际上它发生在较低级别)在我们等待响应时切换到另一个任务。
我想尽快下载文件 python.Here 是我的代码
import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 17 # No of threads to spawn
__chunk_size = 1024
__download_time = -1
__errors = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
try:
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return
else:
# No exception whatsoever
for chunk in my_csv:
AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()
def set_user_agent(self, useragent):
self.useragent = useragent
AsyncDownloader.__user_agent = self.useragent
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__connection_timeout = self.timeout_secs
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__read_timeout = self.timeout_secs
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
AsyncDownloader.__download_count = self.file_count
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
AsyncDownloader.__worker_count = self.worker_count
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
AsyncDownloader.__chunk_size = self.chunk_size
def print_urls(self):
print(AsyncDownloader.__urls)
def get_download_time(self):
return AsyncDownloader.__download_time
def get_errors(self):
return AsyncDownloader.__errors
def download(self):
start = timer()
try:
session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
session.headers.update({'user-agent': AsyncDownloader.__user_agent})
session.request(AsyncDownloader.__connection_timeout,
AsyncDownloader.__connection_timeout, stream=True)
results = []
# Give an accurate file count even if we don't have to download it as it a;ready exist
file_count = 0
for url in AsyncDownloader.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if AsyncDownloader.__download_count != 0:
# No need to download file if it already exist
if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
file_count += 1
continue
else:
if file_count < AsyncDownloader.__download_count:
file_count += 1
results.append(session.get(url))
else:
if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
results.append(session.get(url))
for result in results:
# wait for the response to complete, if it hasn't already
response = result.result()
filename = os.path.basename(response.url)
if response.status_code == 200:
with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
if chunk: # filter out keep-alive new chunks
fd.write(chunk)
end = timer()
AsyncDownloader.__download_time = end - start
except requests.exceptions.HTTPError as errh:
AsyncDownloader.__errors.append("Http Error:" + errh)
# print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
AsyncDownloader.__errors.append("Error Connecting:" + errc)
# print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
AsyncDownloader.__errors.append("Timeout Error:" + errt)
# print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
AsyncDownloader.__errors.append("OOps: Something Else" + err)
else:
return
下面的代码非常糟糕 assumption.Indeed 我假设第一个 url 将首先完成,这当然是不正确的。
# wait for the response to complete, if it hasn't already
response = result.result()
我如何确保只处理已完成的请求,而不是像上面那样以有效的方式进行假设?
对于如何提高性能的任何其他建议,我将不胜感激。
亲切的问候
即使连接已按顺序完成,您仍在按顺序处理文件。第二个文件必须等待第一个文件写入,依此类推。所以,你能做的最好的事情就是并行处理所有事情(尽管有 GIL,这也可以完成,因为像写入磁盘和从网络读取这样的 io 操作会释放它)。基本上,使用常规 requests
库(不是 requests-futures
)并为每个请求 + 文件处理创建一个 future/thread。
还有更多方法可以让它更快,比如在写入时继续下载块(即两个线程,一个用于请求,一个用于文件处理)。并通过发出 multi-part
请求并行读取块,这是 "download accelerator" 领域,您可能不希望代码中出现这种复杂性。
编辑:此外,分块下载是惰性的,这意味着您只是并行发出初始请求,但实际的分块文件下载是按顺序进行的,因为它是在主线程中完成的。因此,您当前的方法并不比 完全同步 方法好多少。以上建议仍然有效。
为了使用您的代码,我创建了一个 .csv
文件,其中包含指向多个 robots.txt
文件的链接,这些文件来自多个网站,顺序如下:GitHub, UDemy, YouTube.
经过调试,第一个结果在
response = result.result()
是(按此顺序):UDemy,YouTube,GitHub。
作为记录,每个 robots.txt
的大小都按照我得到结果的相同顺序增加。
这意味着一开始没有问题,尽管我按特定顺序设置了一个 .csv
文件,但结果是按照文件首次下载的顺序出现的。
I would appreciate any other suggestion on how to improve performance.
至于性能,您可以通过创建一个线程来将响应写入文件来提高速度,或者使用异步 IO 库,例如 Tinche/aiofiles.
如果您想更进一步,可以尝试使用 Python 的替代实现来提高程序本身的性能,例如 PyPy
执行此操作的最简单方法不需要任何线程或特殊异步代码:只需使用常规 requests
库及其内置流选项。您说 response = session.get(url, stream=True)
然后使用 response.iter_content(chunk_size=1024)
(例如)一次访问一个块的下载信息。这是一个功能示例:
import requests
import os
def stream_multiple(urls):
responses = {url: requests.get(url, stream=True) for url in urls)
streams = {url: responses[url].iter_content(chunk_size=1024)
for url in urls}
handles = {url: open(os.path.basename(url), 'wb') for url in urls}
while streams:
for url in list(streams.keys()):
try:
chunk = next(streams[url])
print("Received {} bytes for {}".format(len(chunk), url))
handles[url].write(chunk)
except StopIteration: # no more contenet
handles[url].close()
streams.pop(url)
示例输出:
rat@pandion:~/tmp$ python smu.py
Received 1296 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 1882 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1524 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1508 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 1826 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 2349 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1834 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1838 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 2009 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
...
您可能可以使用线程或多处理实现稍微更快的性能,但我怀疑那会好得多。在几乎所有情况下,将数据写入磁盘都比从网络接收数据快得多。
不担心的话可以用gevent
"monkey patch"
import gevent.monkey
import requests
CONNECTIONS = 10
gevent.monkey.patch_all() # debug in PyCharm: https://blog.jetbrains.com/pycharm/2012/08/gevent-debug-support/
import gevent.pool
def your_request_without_any_changes(url):
return requests.get(url)
pool = gevent.pool.Pool(CONNECTIONS)
for response in pool.imap_unordered(your_request_without_any_changes, ['http://www.google.com'] * 100):
print(response.status_code)
gevent
使用 "event loop" 和 patch requests 库(实际上它发生在较低级别)在我们等待响应时切换到另一个任务。