如何创建并行处理使多个进程下载和写入?
How can create a parallel processing to make multiple processes downloading and writing?
我写了一个多进程下载程序来下载大iso文件,想法是在请求时将iso文件切成4个带有范围参数的部分,并打开4个进程来下载。
import requests
import multiprocessing
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = multiprocessing.cpu_count()
self.fn = url.split('/')[-1]
url_headers = requests.head(self.url)
self.total = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
def get_file_range(self):
ranges = []
download_num = int(self.total/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
fn = '/tmp/' + self.fn + "-" + str(i)
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
r = requests.get(self.url, headers=headers,stream=True)
with open(fn,'wb') as fh:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
fh.write(chunk)
print('process {} end'.format(str(i)))
def run(self):
pool = multiprocessing.Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
整个 iso 文件下载为 4 个部分,我必须将它们连接成一个文件。
使用以下代码将所有 4 个下载的部分合并到同一个文件中效率很低:
flist = ['/tmp/' + self.fn + "-" + str(i) for i in range(4)]
with open("/tmp/" + self.fn ,'wb') as newf:
for filename in flist:
with open(filename,'rb') as hf:
newf.write(hf.read())
如何多进程写入同一个文件?
我可以准备一个空白文件,其大小等于 __init__
.
中的资源
self.fh = open(self.fn,'wb')
self.fh.seek(self.size-1)
self.fh.write(b'[=12=]')
一个困难的工作 remains.The 整个大小是 3947823104
字节,程序将它切割成 4 个范围:
ranges
[(0, 986955776), (986955776, 1973911552), (1973911552, 2960867328), (2960867328, '')]
所有属于其范围的内容都应该多次写入指定位置的空白文件。
我尝试将文件指针设置为seek(ranges[i][0])
,以便ith
进程写入从资源下载的内容,粘贴我未成功的尝试:
import requests,os
import multiprocessing
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = multiprocessing.cpu_count()
self.fn = url.split('/')[-1]
url_headers = requests.head(self.url)
self.size = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
self.fh = open(self.fn,'wb')
self.fh.seek(self.size-1)
self.fh.write(b'[=14=]')
self.fh.flush()
def get_file_range(self):
ranges = []
download_num = int(self.size/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
fn = '/tmp/' + self.fn + "-" + str(i)
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
r = requests.get(self.url, headers=headers,stream=True)
self.fh.seek(self.ranges[i][0])
for chunk in r.iter_content(chunk_size=1024):
if chunk:
self.fh.write(chunk)
self.fh.flush()
print('process {} end'.format(str(i)))
def run(self):
pool = multiprocessing.Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
self.fh.close()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
如何创建一个完美的并行处理让多个进程同时下载和写入?
您可能可以通过并发下载文件片段来实现一些改进,对此多线程应该足够了。要使用的线程数是需要进行试验的东西,不仅要了解它如何影响网络性能,还要了解磁盘输出的并发写入。对于后者,@showkey 使用内存映射文件的建议是一个很好的建议,但是要对内存映射文件使用多处理,您需要在类似 Linux 的平台上,我是 [=31] =]猜测你可能会运行继续(你应该在你用multiprocessing
标记问题时用平台标记你的问题只是这样我就不必猜测了)。但是由于在 Windows 上内存映射不能跨进程共享并且你被迫使用线程,这就是我将使用的。并且由于一个线程将以块的形式检索它的段,它很可能会定期释放 GIL 以允许其他线程 运行。当然,如果你是运行ning在Linux下,那就是修改导入语句的问题了(应该是不言自明的),虽然再次不清楚是否最有效的大小池将是您拥有的内核数。
通过使用内存映射文件,不需要任何文件的最终合并。在方法 __init__
中,您应该将 self.fn
设置为您想要的最终文件名。
import requests, os
# This uses multithreading:
#from multiprocessing.dummy import Pool
# This uses multiprocessing:
from multiprocessing.pool import Pool
import sys
import mmap
import time
#NUM_PROCESSES = os.cpu_count() # You must experiment with this
NUM_PROCESSES = 16
def init_pool(the_mm):
global mm
mm = the_mm
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = NUM_PROCESSES
self.fn = url.split('/')[-1] # Or whatever final filename you want
url_headers = requests.head(self.url)
self.size = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
def get_file_range(self):
ranges = []
download_num = int(self.size/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
print('ranges:', ranges)
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
print(headers, flush=True)
r = requests.get(self.url, headers=headers,stream=True)
offset = self.ranges[i][0]
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print('process {} end'.format(str(i)))
def run(self):
with open(self.fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(self.size - 1)
f.write(b'[=10=]') # But for Windows a 1-byte file will suffice
# Re-open:
with open(self.fn, 'rb+') as f:
# Create memory-mapped file on top of disk file:
with mmap.mmap(f.fileno(), self.size) as mm:
pool = Pool(processes = self.process_num, initializer=init_pool, initargs=(mm,))
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
print(time.time() - start)
以下使用内存映射文件但在单个请求中下载文件的程序花费了 157 秒,而上面的程序在我的桌面上花费了 85 秒:
import mmap
import requests
import sys
import time
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
url_headers = requests.head(url)
size = int(url_headers.headers['Content-Length'])
with open(fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(size - 1)
f.write(b'[=11=]') # But for Windows a 1-byte file will suffice
# Reopen:
with open(fn, 'rb+') as f:
# memory-map the file, size 0 means whole file
with mmap.mmap(f.fileno(), length=size) as mm:
r = requests.get(url, stream=True)
offset = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print(time.time() - start)
以下程序将文件作为单个请求下载,但不使用内存映射文件写入输出,并且耗时 239 秒。
import requests
import sys
import time
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
with open(fn, 'wb') as f:
r = requests.get(url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print(time.time() - start)
总结:
Download file in 16 segments using memory-mapped file: 85 seconds
Download file in 1 segment using memory-mapped file: 157 seconds
Download file in 1 segment without using memory-mapped file: 239 seconds.
除了尝试 NUM_PROCESSES
的不同值外,您可能还想尝试增加与 [= 一起使用的 chunk_size 参数的大小18=]方法,虽然可能没有效果。
我写了一个多进程下载程序来下载大iso文件,想法是在请求时将iso文件切成4个带有范围参数的部分,并打开4个进程来下载。
import requests
import multiprocessing
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = multiprocessing.cpu_count()
self.fn = url.split('/')[-1]
url_headers = requests.head(self.url)
self.total = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
def get_file_range(self):
ranges = []
download_num = int(self.total/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
fn = '/tmp/' + self.fn + "-" + str(i)
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
r = requests.get(self.url, headers=headers,stream=True)
with open(fn,'wb') as fh:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
fh.write(chunk)
print('process {} end'.format(str(i)))
def run(self):
pool = multiprocessing.Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
整个 iso 文件下载为 4 个部分,我必须将它们连接成一个文件。
使用以下代码将所有 4 个下载的部分合并到同一个文件中效率很低:
flist = ['/tmp/' + self.fn + "-" + str(i) for i in range(4)]
with open("/tmp/" + self.fn ,'wb') as newf:
for filename in flist:
with open(filename,'rb') as hf:
newf.write(hf.read())
如何多进程写入同一个文件?
我可以准备一个空白文件,其大小等于 __init__
.
self.fh = open(self.fn,'wb')
self.fh.seek(self.size-1)
self.fh.write(b'[=12=]')
一个困难的工作 remains.The 整个大小是 3947823104
字节,程序将它切割成 4 个范围:
ranges
[(0, 986955776), (986955776, 1973911552), (1973911552, 2960867328), (2960867328, '')]
所有属于其范围的内容都应该多次写入指定位置的空白文件。
我尝试将文件指针设置为seek(ranges[i][0])
,以便ith
进程写入从资源下载的内容,粘贴我未成功的尝试:
import requests,os
import multiprocessing
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = multiprocessing.cpu_count()
self.fn = url.split('/')[-1]
url_headers = requests.head(self.url)
self.size = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
self.fh = open(self.fn,'wb')
self.fh.seek(self.size-1)
self.fh.write(b'[=14=]')
self.fh.flush()
def get_file_range(self):
ranges = []
download_num = int(self.size/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
fn = '/tmp/' + self.fn + "-" + str(i)
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
r = requests.get(self.url, headers=headers,stream=True)
self.fh.seek(self.ranges[i][0])
for chunk in r.iter_content(chunk_size=1024):
if chunk:
self.fh.write(chunk)
self.fh.flush()
print('process {} end'.format(str(i)))
def run(self):
pool = multiprocessing.Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
self.fh.close()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
如何创建一个完美的并行处理让多个进程同时下载和写入?
您可能可以通过并发下载文件片段来实现一些改进,对此多线程应该足够了。要使用的线程数是需要进行试验的东西,不仅要了解它如何影响网络性能,还要了解磁盘输出的并发写入。对于后者,@showkey 使用内存映射文件的建议是一个很好的建议,但是要对内存映射文件使用多处理,您需要在类似 Linux 的平台上,我是 [=31] =]猜测你可能会运行继续(你应该在你用multiprocessing
标记问题时用平台标记你的问题只是这样我就不必猜测了)。但是由于在 Windows 上内存映射不能跨进程共享并且你被迫使用线程,这就是我将使用的。并且由于一个线程将以块的形式检索它的段,它很可能会定期释放 GIL 以允许其他线程 运行。当然,如果你是运行ning在Linux下,那就是修改导入语句的问题了(应该是不言自明的),虽然再次不清楚是否最有效的大小池将是您拥有的内核数。
通过使用内存映射文件,不需要任何文件的最终合并。在方法 __init__
中,您应该将 self.fn
设置为您想要的最终文件名。
import requests, os
# This uses multithreading:
#from multiprocessing.dummy import Pool
# This uses multiprocessing:
from multiprocessing.pool import Pool
import sys
import mmap
import time
#NUM_PROCESSES = os.cpu_count() # You must experiment with this
NUM_PROCESSES = 16
def init_pool(the_mm):
global mm
mm = the_mm
class my_download(object):
def __init__(self,url):
self.url = url
self.process_num = NUM_PROCESSES
self.fn = url.split('/')[-1] # Or whatever final filename you want
url_headers = requests.head(self.url)
self.size = int(url_headers.headers['Content-Length'])
self.ranges = self.get_file_range()
def get_file_range(self):
ranges = []
download_num = int(self.size/self.process_num)
for i in range(self.process_num):
if i == self.process_num-1:
ranges.append((download_num*i,''))
else:
ranges.append((download_num*i,download_num*(i+1)))
print('ranges:', ranges)
return ranges
def run_task(self,i):
print('process {} start'.format(str(i)))
headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
print(headers, flush=True)
r = requests.get(self.url, headers=headers,stream=True)
offset = self.ranges[i][0]
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print('process {} end'.format(str(i)))
def run(self):
with open(self.fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(self.size - 1)
f.write(b'[=10=]') # But for Windows a 1-byte file will suffice
# Re-open:
with open(self.fn, 'rb+') as f:
# Create memory-mapped file on top of disk file:
with mmap.mmap(f.fileno(), self.size) as mm:
pool = Pool(processes = self.process_num, initializer=init_pool, initargs=(mm,))
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
print(time.time() - start)
以下使用内存映射文件但在单个请求中下载文件的程序花费了 157 秒,而上面的程序在我的桌面上花费了 85 秒:
import mmap
import requests
import sys
import time
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
url_headers = requests.head(url)
size = int(url_headers.headers['Content-Length'])
with open(fn, 'wb') as f:
if sys.platform != 'win32':
# If not Windows then memory-mapped file size cannot be greater than disk file size:
f.seek(size - 1)
f.write(b'[=11=]') # But for Windows a 1-byte file will suffice
# Reopen:
with open(fn, 'rb+') as f:
# memory-map the file, size 0 means whole file
with mmap.mmap(f.fileno(), length=size) as mm:
r = requests.get(url, stream=True)
offset = 0
for chunk in r.iter_content(chunk_size=1024):
if chunk:
size = len(chunk)
mm[offset:offset+size] = chunk
offset += size
print(time.time() - start)
以下程序将文件作为单个请求下载,但不使用内存映射文件写入输出,并且耗时 239 秒。
import requests
import sys
import time
start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
with open(fn, 'wb') as f:
r = requests.get(url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print(time.time() - start)
总结:
Download file in 16 segments using memory-mapped file: 85 seconds
Download file in 1 segment using memory-mapped file: 157 seconds
Download file in 1 segment without using memory-mapped file: 239 seconds.
除了尝试 NUM_PROCESSES
的不同值外,您可能还想尝试增加与 [= 一起使用的 chunk_size 参数的大小18=]方法,虽然可能没有效果。