Python 递归多处理 - 线程太多
Python Recursive Multiprocessing - too many threads
背景:
Python3.5.1,Windows7
我有一个包含大量文件和目录的网络驱动器。我正在尝试编写一个脚本来尽快解析所有这些文件,以找到与 RegEx 匹配的所有文件,并将这些文件复制到我的本地 PC 以供查看。大约有3500个目录和子目录,以及几百万个文件。我试图使它尽可能通用(即,不将代码写入这个确切的文件结构)以便将其重新用于其他网络驱动器。我的代码在 运行 针对小型网络驱动器时有效,这里的问题似乎是可伸缩性。
我已经尝试了一些使用多处理库的东西,但似乎无法让它可靠地工作。我的想法是创建一个新作业来解析每个子目录以尽快工作。我有一个递归函数,它解析目录中的所有对象,然后为任何子目录调用自身,并根据 RegEx 检查它找到的任何文件。
问题:如何在不使用池的情况下限制threads/processes的数量来达到我的目标?
我试过的:
- 如果我只使用进程作业,我会在超过几百个线程启动后收到错误
RuntimeError: can't start new thread
,并开始断开连接。我最终找到了大约一半的文件,因为一半的目录出错了(下面的代码)。
- 为了限制总线程数,我尝试使用Pool方法,但是我无法根据this question将池对象传递给被调用的方法,这使得递归实现无法实现。
- 为了解决这个问题,我尝试在 Pool 方法中调用 Processes,但出现错误
daemonic processes are not allowed to have children
。
- 我认为如果我可以限制并发线程数,那么我的解决方案将按设计工作。
代码:
import os
import re
import shutil
from multiprocessing import Process, Manager
CheckLocations = ['network drive location 1', 'network drive location 2']
SaveLocation = 'local PC location'
FileNameRegex = re.compile('RegEx here', flags = re.IGNORECASE)
# Loop through all items in folder, and call itself for subfolders.
def ParseFolderContents(path, DebugFileList):
FolderList = []
jobs = []
TempList = []
if not os.path.exists(path):
return
try:
for item in os.scandir(path):
try:
if item.is_dir():
p = Process(target=ParseFolderContents, args=(item.path, DebugFileList))
jobs.append(p)
p.start()
elif FileNameRegex.search(item.name) != None:
DebugFileList.append((path, item.name))
else:
pass
except Exception as ex:
if hasattr(ex, 'message'):
print(ex.message)
else:
print(ex)
# print('Error in file:\t' + item.path)
except Exception as ex:
if hasattr(ex, 'message'):
print(ex.message)
else:
print('Error in path:\t' + path)
pass
else:
print('\tToo many threads to restart directory.')
for job in jobs:
job.join()
# Save list of debug files.
def SaveDebugFiles(DebugFileList):
for file in DebugFileList:
try:
shutil.copyfile(file[0] + '\' + file[1], SaveLocation + file[1])
except PermissionError:
continue
if __name__ == '__main__':
with Manager() as manager:
# Iterate through all directories to make a list of all desired files.
DebugFileList = manager.list()
jobs = []
for path in CheckLocations:
p = Process(target=ParseFolderContents, args=(path, DebugFileList))
jobs.append(p)
p.start()
for job in jobs:
job.join()
print('\n' + str(len(DebugFileList)) + ' files found.\n')
if len(DebugFileList) == 0:
quit()
# Iterate through all debug files and copy them to local PC.
n = 25 # Number of files to grab for each parallel path.
TempList = [DebugFileList[i:i + n] for i in range(0, len(DebugFileList), n)] # Split list into small chunks.
jobs = []
for item in TempList:
p = Process(target=SaveDebugFiles, args=(item, ))
jobs.append(p)
p.start()
for job in jobs:
job.join()
不要轻视池的用处,尤其是当您想控制要创建的进程数时。他们还负责管理您的员工(create/start/join/distribute 大块工作)并帮助您收集潜在结果。
正如您自己意识到的那样,您创建了太多进程,以至于您似乎耗尽了如此多的系统资源以致于无法创建更多进程。
此外,代码中新进程的创建受外部因素控制,即文件树中的文件夹数量,这使得限制进程数量变得非常困难。此外,创建新进程会在 OS 上带来相当大的开销,您甚至可能最终将这些开销浪费在空目录上。另外,进程之间的上下文切换非常昂贵。
根据您创建的进程数量,给定您声明的文件夹数量,您的进程基本上只会坐在那里,大部分时间都处于空闲状态,同时等待一部分 CPU 时间来实际执行做一些工作。对于所述 CPU 时间会有很多争论,除非你有一台拥有数千个内核的超级计算机供你使用。即使一个进程有一些 CPU 的时间来工作,它也可能会花费相当多的时间等待 I/O。
话虽这么说,您可能想要研究使用线程来完成这样的任务。你可以在你的代码中做一些优化。从您的示例中,我看不出您为什么要拆分识别要复制的文件并将它们实际复制到不同的任务中。为什么不让您的员工立即复制他们发现与 RE 匹配的每个文件?
我将从主线程使用 os.walk
(我认为相当快)在相关目录中创建一个文件列表,然后将该列表卸载到一个工作池中,该工作池检查这些文件立即匹配并复制:
import os
import re
from multiprocessing.pool import ThreadPool
search_dirs = ["dir 1", "dir2"]
ptn = re.compile(r"your regex")
# your target dir definition
file_list = []
for topdir in search_dirs:
for root, dirs, files in os.walk(topdir):
for file in files:
file_list.append(os.path.join(root, file))
def copier(path):
if ptn.match(path):
# do your shutil.copyfile with the try-except right here
# obviously I did not want to start mindlessly copying around files on my box :)
return path
with ThreadPool(processes=10) as pool:
results = pool.map(copier, file_list)
# print all the processed files. For those that did not match, None is returned
print("\n".join([r for r in results if r]))
附带说明:不要手动连接路径 (file[0] + "\" + file[1]
),而是为此使用 os.path.join
。
我无法让它完全按照我的期望工作。 os.walk 很慢,我想到的所有其他方法要么速度相似,要么由于线程太多而崩溃。
我最终使用了我在上面发布的类似方法,但不是从顶级目录开始递归,而是向下一级或二级目录,直到有几个目录。然后它将依次在这些目录中的每一个开始递归,这限制了足以成功完成的线程数。执行时间类似于 os.walk,这可能会使实现更简单、更易读。
背景:
Python3.5.1,Windows7
我有一个包含大量文件和目录的网络驱动器。我正在尝试编写一个脚本来尽快解析所有这些文件,以找到与 RegEx 匹配的所有文件,并将这些文件复制到我的本地 PC 以供查看。大约有3500个目录和子目录,以及几百万个文件。我试图使它尽可能通用(即,不将代码写入这个确切的文件结构)以便将其重新用于其他网络驱动器。我的代码在 运行 针对小型网络驱动器时有效,这里的问题似乎是可伸缩性。
我已经尝试了一些使用多处理库的东西,但似乎无法让它可靠地工作。我的想法是创建一个新作业来解析每个子目录以尽快工作。我有一个递归函数,它解析目录中的所有对象,然后为任何子目录调用自身,并根据 RegEx 检查它找到的任何文件。
问题:如何在不使用池的情况下限制threads/processes的数量来达到我的目标?
我试过的:
- 如果我只使用进程作业,我会在超过几百个线程启动后收到错误
RuntimeError: can't start new thread
,并开始断开连接。我最终找到了大约一半的文件,因为一半的目录出错了(下面的代码)。 - 为了限制总线程数,我尝试使用Pool方法,但是我无法根据this question将池对象传递给被调用的方法,这使得递归实现无法实现。
- 为了解决这个问题,我尝试在 Pool 方法中调用 Processes,但出现错误
daemonic processes are not allowed to have children
。 - 我认为如果我可以限制并发线程数,那么我的解决方案将按设计工作。
代码:
import os
import re
import shutil
from multiprocessing import Process, Manager
CheckLocations = ['network drive location 1', 'network drive location 2']
SaveLocation = 'local PC location'
FileNameRegex = re.compile('RegEx here', flags = re.IGNORECASE)
# Loop through all items in folder, and call itself for subfolders.
def ParseFolderContents(path, DebugFileList):
FolderList = []
jobs = []
TempList = []
if not os.path.exists(path):
return
try:
for item in os.scandir(path):
try:
if item.is_dir():
p = Process(target=ParseFolderContents, args=(item.path, DebugFileList))
jobs.append(p)
p.start()
elif FileNameRegex.search(item.name) != None:
DebugFileList.append((path, item.name))
else:
pass
except Exception as ex:
if hasattr(ex, 'message'):
print(ex.message)
else:
print(ex)
# print('Error in file:\t' + item.path)
except Exception as ex:
if hasattr(ex, 'message'):
print(ex.message)
else:
print('Error in path:\t' + path)
pass
else:
print('\tToo many threads to restart directory.')
for job in jobs:
job.join()
# Save list of debug files.
def SaveDebugFiles(DebugFileList):
for file in DebugFileList:
try:
shutil.copyfile(file[0] + '\' + file[1], SaveLocation + file[1])
except PermissionError:
continue
if __name__ == '__main__':
with Manager() as manager:
# Iterate through all directories to make a list of all desired files.
DebugFileList = manager.list()
jobs = []
for path in CheckLocations:
p = Process(target=ParseFolderContents, args=(path, DebugFileList))
jobs.append(p)
p.start()
for job in jobs:
job.join()
print('\n' + str(len(DebugFileList)) + ' files found.\n')
if len(DebugFileList) == 0:
quit()
# Iterate through all debug files and copy them to local PC.
n = 25 # Number of files to grab for each parallel path.
TempList = [DebugFileList[i:i + n] for i in range(0, len(DebugFileList), n)] # Split list into small chunks.
jobs = []
for item in TempList:
p = Process(target=SaveDebugFiles, args=(item, ))
jobs.append(p)
p.start()
for job in jobs:
job.join()
不要轻视池的用处,尤其是当您想控制要创建的进程数时。他们还负责管理您的员工(create/start/join/distribute 大块工作)并帮助您收集潜在结果。
正如您自己意识到的那样,您创建了太多进程,以至于您似乎耗尽了如此多的系统资源以致于无法创建更多进程。
此外,代码中新进程的创建受外部因素控制,即文件树中的文件夹数量,这使得限制进程数量变得非常困难。此外,创建新进程会在 OS 上带来相当大的开销,您甚至可能最终将这些开销浪费在空目录上。另外,进程之间的上下文切换非常昂贵。
根据您创建的进程数量,给定您声明的文件夹数量,您的进程基本上只会坐在那里,大部分时间都处于空闲状态,同时等待一部分 CPU 时间来实际执行做一些工作。对于所述 CPU 时间会有很多争论,除非你有一台拥有数千个内核的超级计算机供你使用。即使一个进程有一些 CPU 的时间来工作,它也可能会花费相当多的时间等待 I/O。
话虽这么说,您可能想要研究使用线程来完成这样的任务。你可以在你的代码中做一些优化。从您的示例中,我看不出您为什么要拆分识别要复制的文件并将它们实际复制到不同的任务中。为什么不让您的员工立即复制他们发现与 RE 匹配的每个文件?
我将从主线程使用 os.walk
(我认为相当快)在相关目录中创建一个文件列表,然后将该列表卸载到一个工作池中,该工作池检查这些文件立即匹配并复制:
import os
import re
from multiprocessing.pool import ThreadPool
search_dirs = ["dir 1", "dir2"]
ptn = re.compile(r"your regex")
# your target dir definition
file_list = []
for topdir in search_dirs:
for root, dirs, files in os.walk(topdir):
for file in files:
file_list.append(os.path.join(root, file))
def copier(path):
if ptn.match(path):
# do your shutil.copyfile with the try-except right here
# obviously I did not want to start mindlessly copying around files on my box :)
return path
with ThreadPool(processes=10) as pool:
results = pool.map(copier, file_list)
# print all the processed files. For those that did not match, None is returned
print("\n".join([r for r in results if r]))
附带说明:不要手动连接路径 (file[0] + "\" + file[1]
),而是为此使用 os.path.join
。
我无法让它完全按照我的期望工作。 os.walk 很慢,我想到的所有其他方法要么速度相似,要么由于线程太多而崩溃。
我最终使用了我在上面发布的类似方法,但不是从顶级目录开始递归,而是向下一级或二级目录,直到有几个目录。然后它将依次在这些目录中的每一个开始递归,这限制了足以成功完成的线程数。执行时间类似于 os.walk,这可能会使实现更简单、更易读。