如何创建并行处理使多个进程下载和写入?

How can create a parallel processing to make multiple processes downloading and writing?

我写了一个多进程下载程序来下载大iso文件,想法是在请求时将iso文件切成4个带有范围参数的部分,并打开4个进程来下载。

import  requests
import  multiprocessing

class my_download(object): 
    def __init__(self,url):
        self.url = url
        self.process_num = multiprocessing.cpu_count()
        self.fn = url.split('/')[-1]
        url_headers = requests.head(self.url)
        self.total = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()

    def get_file_range(self):
        ranges = []
        download_num = int(self.total/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i+1)))
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        fn = '/tmp/' + self.fn + "-" + str(i)
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        r = requests.get(self.url, headers=headers,stream=True)
        with open(fn,'wb') as fh:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    fh.write(chunk)
        print('process {} end'.format(str(i)))

    def run(self):
        pool = multiprocessing.Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()


url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()

整个 iso 文件下载为 4 个部分,我必须将它们连接成一个文件。

使用以下代码将所有 4 个下载的部分合并到同一个文件中效率很低:

    flist = ['/tmp/' + self.fn + "-" + str(i) for i in range(4)]           
    with open("/tmp/" + self.fn ,'wb') as newf:
        for filename in flist:
            with open(filename,'rb') as hf:
                newf.write(hf.read())

如何多进程写入同一个文件?
我可以准备一个空白文件,其大小等于 __init__.

中的资源
    self.fh = open(self.fn,'wb')
    self.fh.seek(self.size-1)
    self.fh.write(b'[=12=]')

一个困难的工作 remains.The 整个大小是 3947823104 字节,程序将它切割成 4 个范围:

ranges
[(0, 986955776), (986955776, 1973911552), (1973911552, 2960867328), (2960867328, '')]

所有属于其范围的内容都应该多次写入指定位置的空白文件。
我尝试将文件指针设置为seek(ranges[i][0]),以便ith进程写入从资源下载的内容,粘贴我未成功的尝试:

import  requests,os
import  multiprocessing    

class my_download(object): 
    def __init__(self,url):
        self.url = url
        self.process_num = multiprocessing.cpu_count()
        self.fn = url.split('/')[-1]
        url_headers = requests.head(self.url)
        self.size = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()
        self.fh = open(self.fn,'wb')
        self.fh.seek(self.size-1)
        self.fh.write(b'[=14=]')
        self.fh.flush()

    def get_file_range(self):
        ranges = []
        download_num = int(self.size/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i+1)))
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        fn = '/tmp/' + self.fn + "-" + str(i)
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        r = requests.get(self.url, headers=headers,stream=True)
        self.fh.seek(self.ranges[i][0])
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                self.fh.write(chunk)
        self.fh.flush()
        print('process {} end'.format(str(i)))

    def run(self):
        pool = multiprocessing.Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()
        self.fh.close()


url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()

如何创建一个完美的并行处理让多个进程同时下载和写入?

您可能可以通过并发下载文件片段来实现一些改进,对此多线程应该足够了。要使用的线程数是需要进行试验的东西,不仅要了解它如何影响网络性能,还要了解磁盘输出的并发写入。对于后者,@showkey 使用内存映射文件的建议是一个很好的建议,但是要对内存映射文件使用多处理,您需要在类似 Linux 的平台上,我是 [=31] =]猜测你可能会运行继续(你应该在你用multiprocessing标记问题时用平台标记你的问题只是这样我就不必猜测了)。但是由于在 Windows 上内存映射不能跨进程共享并且你被迫使用线程,这就是我将使用的。并且由于一个线程将以块的形式检索它的段,它很可能会定期释放 GIL 以允许其他线程 运行。当然,如果你是运行ning在Linux下,那就是修改导入语句的问题了(应该是不言自明的),虽然再次不清楚是否最有效的大小池将是您拥有的内核数。

通过使用内存映射文件,不需要任何文件的最终合并。在方法 __init__ 中,您应该将 self.fn 设置为您想要的最终文件名。

import requests, os
# This uses multithreading:
#from multiprocessing.dummy import Pool
# This uses multiprocessing:
from multiprocessing.pool import Pool
import sys
import mmap
import time

#NUM_PROCESSES = os.cpu_count() # You must experiment with this
NUM_PROCESSES = 16

def init_pool(the_mm):
    global mm
    mm = the_mm

class my_download(object):
    def __init__(self,url):
        self.url = url
        self.process_num = NUM_PROCESSES
        self.fn = url.split('/')[-1] # Or whatever final filename you want
        url_headers = requests.head(self.url)
        self.size = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()

    def get_file_range(self):
        ranges = []
        download_num = int(self.size/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i+1)))
        print('ranges:', ranges)
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        print(headers, flush=True)
        r = requests.get(self.url, headers=headers,stream=True)
        offset = self.ranges[i][0]
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                size = len(chunk)
                mm[offset:offset+size] = chunk
                offset += size
        print('process {} end'.format(str(i)))

    def run(self):
        with open(self.fn, 'wb') as f:
            if sys.platform != 'win32':
                # If not Windows then memory-mapped file size cannot be greater than disk file size:
                f.seek(self.size - 1)
            f.write(b'[=10=]') # But for Windows a 1-byte file will suffice
        # Re-open:
        with open(self.fn, 'rb+') as f:
            # Create memory-mapped file on top of disk file:
            with mmap.mmap(f.fileno(), self.size) as mm:
                pool = Pool(processes = self.process_num, initializer=init_pool, initargs=(mm,))
                for i in range(self.process_num):
                    pool.apply_async(self.run_task,args = (i,))
                pool.close()
                pool.join()

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
print(time.time() - start)

以下使用内存映射文件但在单个请求中下载文件的程序花费了 157 秒,而上面的程序在我的桌面上花费了 85 秒:

import mmap
import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
url_headers = requests.head(url)
size = int(url_headers.headers['Content-Length'])
with open(fn, 'wb') as f:
    if sys.platform != 'win32':
        # If not Windows then memory-mapped file size cannot be greater than disk file size:
        f.seek(size - 1)
    f.write(b'[=11=]') # But for Windows a 1-byte file will suffice
# Reopen:
with open(fn, 'rb+') as f:
    # memory-map the file, size 0 means whole file
    with mmap.mmap(f.fileno(), length=size) as mm:
        r = requests.get(url, stream=True)
        offset = 0
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                size = len(chunk)
                mm[offset:offset+size] = chunk
                offset += size
print(time.time() - start)

以下程序将文件作为单个请求下载,但不使用内存映射文件写入输出,并且耗时 239 秒。

import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
with open(fn, 'wb') as f:
    r = requests.get(url, stream=True)
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)
print(time.time() - start)

总结:

Download file in 16 segments using memory-mapped file: 85 seconds
Download file in 1 segment using memory-mapped file: 157 seconds
Download file in 1 segment without using memory-mapped file: 239 seconds.

除了尝试 NUM_PROCESSES 的不同值外,您可能还想尝试增加与 [= 一起使用的 chunk_size 参数的大小18=]方法,虽然可能没有效果。