Python multiprocess/multithreading 加快文件复制
Python multiprocess/multithreading to speed up file copying
我有一个程序可以将大量文件从一个位置复制到另一个位置 - 我说的是 100,000 多个文件(此时我正在复制 314g 的图像序列)。他们都在巨大的、非常快速的网络存储 RAID 中。我正在使用 shutil 按顺序复制文件,这需要一些时间,所以我试图找到优化它的最佳方法。我注意到一些软件我有效地使用多线程从网络读取文件,加载时间大大增加,所以我想在 python.
中尝试这样做
我没有编程经验 multithreading/multiprocessesing - 这似乎是继续进行的正确领域吗?如果是这样,最好的方法是什么?我查看了其他一些关于 python 中线程文件复制的 SO 帖子,他们似乎都说你没有速度提升,但考虑到我的硬件,我认为情况不会如此。我目前离我的 IO 上限还很远,资源大约为 1%(我在本地有 40 个内核和 64g RAM)。
这可以通过在 Python 中使用 gevent 来并行化。
我会推荐以下逻辑来实现加速 100k+ 文件复制:
将需要复制的所有100K+文件的名称放在一个csv文件中,例如:'input.csv'。
然后从该 csv 文件创建块。块的数量应根据您机器中的 no.of processors/cores 来决定。
将每个块传递给单独的线程。
每个线程依次读取该块中的文件名并将其从一个位置复制到另一个位置。
这里是 python 代码片段:
import sys
import os
import multiprocessing
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
def _copyFile(file):
# over here, you can put your own logic of copying a file from source to destination
def _worker(csv_file, chunk):
f = open(csv_file)
f.seek(chunk[0])
for file in f.read(chunk[1]).splitlines():
_copyFile(file)
def _getChunks(file, size):
f = open(file)
while 1:
start = f.tell()
f.seek(size, 1)
s = f.readline()
yield start, f.tell() - start
if not s:
f.close()
break
if __name__ == "__main__":
if(len(sys.argv) > 1):
csv_file_name = sys.argv[1]
else:
print "Please provide a csv file as an argument."
sys.exit()
no_of_procs = multiprocessing.cpu_count() * 4
file_size = os.stat(csv_file_name).st_size
file_size_per_chunk = file_size/no_of_procs
pool = Pool(no_of_procs)
for chunk in _getChunks(csv_file_name, file_size_per_chunk):
pool.apply_async(_worker, (csv_file_name, chunk))
pool.join()
将文件另存为 file_copier.py。
打开终端和 运行:
$ ./file_copier.py input.csv
更新:
我从来没有让 Gevent 工作(第一个答案),因为我无法在没有互联网连接的情况下安装模块,而我的工作站上没有互联网连接。但是,仅使用带有 python 的内置线程(我已经学会了如何使用),我就能够将文件复制时间减少 8 次,我想 post 它作为任何人的额外答案感兴趣的!下面是我的代码,可能很重要的一点是,由于您的 hardware/network 设置,我的 8 倍复制时间很可能因环境而异。
import Queue, threading, os, time
import shutil
fileQueue = Queue.Queue()
destPath = 'path/to/cop'
class ThreadedCopy:
totalFiles = 0
copyCount = 0
lock = threading.Lock()
def __init__(self):
with open("filelist.txt", "r") as txt: #txt with a file per line
fileList = txt.read().splitlines()
if not os.path.exists(destPath):
os.mkdir(destPath)
self.totalFiles = len(fileList)
print str(self.totalFiles) + " files to copy."
self.threadWorkerCopy(fileList)
def CopyWorker(self):
while True:
fileName = fileQueue.get()
shutil.copy(fileName, destPath)
fileQueue.task_done()
with self.lock:
self.copyCount += 1
percent = (self.copyCount * 100) / self.totalFiles
print str(percent) + " percent copied."
def threadWorkerCopy(self, fileNameList):
for i in range(16):
t = threading.Thread(target=self.CopyWorker)
t.daemon = True
t.start()
for fileName in fileNameList:
fileQueue.put(fileName)
fileQueue.join()
ThreadedCopy()
在重新执行代码 post 由 @Spencer 编写时,我 运行 遇到了与 post 下面的评论中提到的相同的错误(更具体地说:OSError: [Errno 24] Too many open files
).
我通过远离守护线程并使用 concurrent.futures.ThreadPoolExecutor
来解决这个问题。这似乎以更好的方式处理要复制的文件的打开和关闭。通过这样做,除了现在看起来像这样的 threadWorkerCopy(self, filename_list: List[str])
方法之外,所有代码都保持不变:
def threadWorkerCopy(self, filename_list: List[str]):
"""
This function initializes the workers to enable the multi-threaded process. The workers are handles automatically with
ThreadPoolExecutor. More infos about multi-threading can be found here: https://realpython.com/intro-to-python-threading/.
A recurrent problem with the threading here was "OSError: [Errno 24] Too many open files". This was coming from the fact
that deamon threads were not killed before the end of the script. Therefore, everything opened by them was never closed.
Args:
filename_list (List[str]): List containing the name of the files to copy.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
executor.submit(self.CopyWorker)
for filename in filename_list:
self.file_queue.put(filename)
self.file_queue.join() # program waits for this process to be done.
使用 ThreadPool
怎么样?
import os
import glob
import shutil
from functools import partial
from multiprocessing.pool import ThreadPool
DST_DIR = '../path/to/new/dir'
SRC_DIR = '../path/to/files/to/copy'
# copy_to_mydir will copy any file you give it to DST_DIR
copy_to_mydir = partial(shutil.copy, dst=DST_DIR)
# list of files we want to copy
to_copy = glob.glob(os.path.join(SRC_DIR, '*'))
with ThreadPool(4) as p:
p.map(copy_to_mydir, to_copy)
如果您只想将目录树从一个路径复制到另一个路径,这是我的解决方案,它比以前的解决方案简单一点。它利用 multiprocessing.pool.ThreadPool
并为 shutil.copytree
:
使用自定义复制函数
import shutil
from multiprocessing.pool import ThreadPool
class MultithreadedCopier:
def __init__(self, max_threads):
self.pool = ThreadPool(max_threads)
def copy(self, source, dest):
self.pool.apply_async(shutil.copy2, args=(source, dest))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
self.pool.join()
src_dir = "/path/to/src/dir"
dest_dir = "/path/to/dest/dir"
with MultithreadedCopier(max_threads=16) as copier:
shutil.copytree(src_dir, dest_dir, copy_function=copier.copy)
我有一个程序可以将大量文件从一个位置复制到另一个位置 - 我说的是 100,000 多个文件(此时我正在复制 314g 的图像序列)。他们都在巨大的、非常快速的网络存储 RAID 中。我正在使用 shutil 按顺序复制文件,这需要一些时间,所以我试图找到优化它的最佳方法。我注意到一些软件我有效地使用多线程从网络读取文件,加载时间大大增加,所以我想在 python.
中尝试这样做我没有编程经验 multithreading/multiprocessesing - 这似乎是继续进行的正确领域吗?如果是这样,最好的方法是什么?我查看了其他一些关于 python 中线程文件复制的 SO 帖子,他们似乎都说你没有速度提升,但考虑到我的硬件,我认为情况不会如此。我目前离我的 IO 上限还很远,资源大约为 1%(我在本地有 40 个内核和 64g RAM)。
这可以通过在 Python 中使用 gevent 来并行化。
我会推荐以下逻辑来实现加速 100k+ 文件复制:
将需要复制的所有100K+文件的名称放在一个csv文件中,例如:'input.csv'。
然后从该 csv 文件创建块。块的数量应根据您机器中的 no.of processors/cores 来决定。
将每个块传递给单独的线程。
每个线程依次读取该块中的文件名并将其从一个位置复制到另一个位置。
这里是 python 代码片段:
import sys
import os
import multiprocessing
from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
def _copyFile(file):
# over here, you can put your own logic of copying a file from source to destination
def _worker(csv_file, chunk):
f = open(csv_file)
f.seek(chunk[0])
for file in f.read(chunk[1]).splitlines():
_copyFile(file)
def _getChunks(file, size):
f = open(file)
while 1:
start = f.tell()
f.seek(size, 1)
s = f.readline()
yield start, f.tell() - start
if not s:
f.close()
break
if __name__ == "__main__":
if(len(sys.argv) > 1):
csv_file_name = sys.argv[1]
else:
print "Please provide a csv file as an argument."
sys.exit()
no_of_procs = multiprocessing.cpu_count() * 4
file_size = os.stat(csv_file_name).st_size
file_size_per_chunk = file_size/no_of_procs
pool = Pool(no_of_procs)
for chunk in _getChunks(csv_file_name, file_size_per_chunk):
pool.apply_async(_worker, (csv_file_name, chunk))
pool.join()
将文件另存为 file_copier.py。 打开终端和 运行:
$ ./file_copier.py input.csv
更新:
我从来没有让 Gevent 工作(第一个答案),因为我无法在没有互联网连接的情况下安装模块,而我的工作站上没有互联网连接。但是,仅使用带有 python 的内置线程(我已经学会了如何使用),我就能够将文件复制时间减少 8 次,我想 post 它作为任何人的额外答案感兴趣的!下面是我的代码,可能很重要的一点是,由于您的 hardware/network 设置,我的 8 倍复制时间很可能因环境而异。
import Queue, threading, os, time
import shutil
fileQueue = Queue.Queue()
destPath = 'path/to/cop'
class ThreadedCopy:
totalFiles = 0
copyCount = 0
lock = threading.Lock()
def __init__(self):
with open("filelist.txt", "r") as txt: #txt with a file per line
fileList = txt.read().splitlines()
if not os.path.exists(destPath):
os.mkdir(destPath)
self.totalFiles = len(fileList)
print str(self.totalFiles) + " files to copy."
self.threadWorkerCopy(fileList)
def CopyWorker(self):
while True:
fileName = fileQueue.get()
shutil.copy(fileName, destPath)
fileQueue.task_done()
with self.lock:
self.copyCount += 1
percent = (self.copyCount * 100) / self.totalFiles
print str(percent) + " percent copied."
def threadWorkerCopy(self, fileNameList):
for i in range(16):
t = threading.Thread(target=self.CopyWorker)
t.daemon = True
t.start()
for fileName in fileNameList:
fileQueue.put(fileName)
fileQueue.join()
ThreadedCopy()
在重新执行代码 post 由 @Spencer 编写时,我 运行 遇到了与 post 下面的评论中提到的相同的错误(更具体地说:OSError: [Errno 24] Too many open files
).
我通过远离守护线程并使用 concurrent.futures.ThreadPoolExecutor
来解决这个问题。这似乎以更好的方式处理要复制的文件的打开和关闭。通过这样做,除了现在看起来像这样的 threadWorkerCopy(self, filename_list: List[str])
方法之外,所有代码都保持不变:
def threadWorkerCopy(self, filename_list: List[str]):
"""
This function initializes the workers to enable the multi-threaded process. The workers are handles automatically with
ThreadPoolExecutor. More infos about multi-threading can be found here: https://realpython.com/intro-to-python-threading/.
A recurrent problem with the threading here was "OSError: [Errno 24] Too many open files". This was coming from the fact
that deamon threads were not killed before the end of the script. Therefore, everything opened by them was never closed.
Args:
filename_list (List[str]): List containing the name of the files to copy.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
executor.submit(self.CopyWorker)
for filename in filename_list:
self.file_queue.put(filename)
self.file_queue.join() # program waits for this process to be done.
使用 ThreadPool
怎么样?
import os
import glob
import shutil
from functools import partial
from multiprocessing.pool import ThreadPool
DST_DIR = '../path/to/new/dir'
SRC_DIR = '../path/to/files/to/copy'
# copy_to_mydir will copy any file you give it to DST_DIR
copy_to_mydir = partial(shutil.copy, dst=DST_DIR)
# list of files we want to copy
to_copy = glob.glob(os.path.join(SRC_DIR, '*'))
with ThreadPool(4) as p:
p.map(copy_to_mydir, to_copy)
如果您只想将目录树从一个路径复制到另一个路径,这是我的解决方案,它比以前的解决方案简单一点。它利用 multiprocessing.pool.ThreadPool
并为 shutil.copytree
:
import shutil
from multiprocessing.pool import ThreadPool
class MultithreadedCopier:
def __init__(self, max_threads):
self.pool = ThreadPool(max_threads)
def copy(self, source, dest):
self.pool.apply_async(shutil.copy2, args=(source, dest))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
self.pool.join()
src_dir = "/path/to/src/dir"
dest_dir = "/path/to/dest/dir"
with MultithreadedCopier(max_threads=16) as copier:
shutil.copytree(src_dir, dest_dir, copy_function=copier.copy)