读取和处理大文件时的多线程(内存太大)
Multithreading while reading and processing a huge file (too big for memory)
我有以下运行速度非常慢的代码。这是一个拆分大文件(80 gig)并将其放入树状文件夹结构以便快速查找的程序。我在代码中做了一些注释,以帮助您理解它。
# Libraries
import os
# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth
# Preperations
os.makedirs(outputdirectory)
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
for line in infile:
pipeline(line)
有没有办法让多线程工作?因为我自己尝试了一些我在网上找到的例子,它把所有的东西都放在内存中,导致我的电脑多次死机。
首先,(IMO)最简单的解决方案
如果这些行看起来完全独立,只需将文件分成 N 个块,将文件名作为程序参数传递给 open,并 运行 当前脚本的多个实例手动启动它们多个命令行。
优点:
- 无需深入研究多进程、进程间通信等
- 不需要过多改动代码
缺点:
- 您需要对大文件进行预处理,将其分成多个块(尽管这将比您当前的执行时间快得多,因为您不会有每行打开-关闭的情况)
- 您需要自己启动进程,为每个进程传递适当的文件名
这将实现为:
预处理:
APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.
这样做并不能确保所有块中的行数为偶数,但会使预处理速度更快,因为您是按块读取而不是逐行读取。根据需要调整块大小。
另外,请注意,通过使用 readlines
我们可以确定块之间不会有断行,但是由于函数 returns 是一个行列表,我们使用 writelines
将其写入我们的输出文件(相当于循环遍历列表和 ofp.write(line)
)。为了完整起见,请注意,您还可以连接内存中的所有字符串并仅调用一次 write
(即执行 ofp.write(''.join(next_chunk))
),这可能会给您带来一些(次要的)性能优势,支付(多)更高的 RAM 使用率。
主要脚本:
您唯一需要的修改是在最上面:
import sys
file=sys.argv[1]
... # rest of your script here
通过使用 argv
,您可以将命令行参数传递给您的程序(在本例中为要打开的文件)。然后,只需 运行 您的脚本为:
python process_the_file.py big_file_0.txt
这将 运行 一个进程。打开多个终端和 运行 相同的命令,每个终端使用 big_file_N.txt
,它们将彼此独立。
注意:我使用argv[1]
,因为对于所有程序,argv
(即argv[0]
)的第一个值始终是程序名字.
然后,multiprocessing
解
虽然有效,但第一个解决方案并不十分优雅,特别是因为如果您从一个大小为 80GB 的文件开始,您将有 80 个文件。
更简洁的解决方案是使用 python 的 multiprocessing
模块(重要:不是 threading
!如果您不知道其中的区别,请查找“全局解释器”锁”以及为什么 python 中的多线程无法按照您认为的方式工作)。
我们的想法是让一个“生产者”进程打开大文件并不断地将其中的行放入队列中。然后,从队列中提取行并进行处理的“消费者”进程池。
优点:
- 一个脚本搞定一切
- 无需打开多个终端并进行输入
缺点:
- 复杂性
- 使用进程间通信,有一些开销
这将按如下方式实施:
# Libraries
import os
import multiprocessing
outputdirectory="sorted"
depth=4 # This is the tree depth
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
if __name__ == '__main__':
# Variables
file="80_gig_file.txt"
# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM
with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()
if __name__ == '__main__'
行是将 运行 在每个进程上的代码与 运行 仅在“父亲”上的代码分开的障碍。每个进程都定义了 pipeline
,但实际上只有父进程产生了一个工作池并应用了该函数。您会找到有关 multiprocessing.map
here
的更多详细信息
编辑:
添加了关闭和加入池以防止主进程退出并杀死进程中的子进程。
我有以下运行速度非常慢的代码。这是一个拆分大文件(80 gig)并将其放入树状文件夹结构以便快速查找的程序。我在代码中做了一些注释,以帮助您理解它。
# Libraries
import os
# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth
# Preperations
os.makedirs(outputdirectory)
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
for line in infile:
pipeline(line)
有没有办法让多线程工作?因为我自己尝试了一些我在网上找到的例子,它把所有的东西都放在内存中,导致我的电脑多次死机。
首先,(IMO)最简单的解决方案
如果这些行看起来完全独立,只需将文件分成 N 个块,将文件名作为程序参数传递给 open,并 运行 当前脚本的多个实例手动启动它们多个命令行。
优点:
- 无需深入研究多进程、进程间通信等
- 不需要过多改动代码
缺点:
- 您需要对大文件进行预处理,将其分成多个块(尽管这将比您当前的执行时间快得多,因为您不会有每行打开-关闭的情况)
- 您需要自己启动进程,为每个进程传递适当的文件名
这将实现为:
预处理:
APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.
这样做并不能确保所有块中的行数为偶数,但会使预处理速度更快,因为您是按块读取而不是逐行读取。根据需要调整块大小。
另外,请注意,通过使用 readlines
我们可以确定块之间不会有断行,但是由于函数 returns 是一个行列表,我们使用 writelines
将其写入我们的输出文件(相当于循环遍历列表和 ofp.write(line)
)。为了完整起见,请注意,您还可以连接内存中的所有字符串并仅调用一次 write
(即执行 ofp.write(''.join(next_chunk))
),这可能会给您带来一些(次要的)性能优势,支付(多)更高的 RAM 使用率。
主要脚本:
您唯一需要的修改是在最上面:
import sys
file=sys.argv[1]
... # rest of your script here
通过使用 argv
,您可以将命令行参数传递给您的程序(在本例中为要打开的文件)。然后,只需 运行 您的脚本为:
python process_the_file.py big_file_0.txt
这将 运行 一个进程。打开多个终端和 运行 相同的命令,每个终端使用 big_file_N.txt
,它们将彼此独立。
注意:我使用argv[1]
,因为对于所有程序,argv
(即argv[0]
)的第一个值始终是程序名字.
然后,multiprocessing
解
虽然有效,但第一个解决方案并不十分优雅,特别是因为如果您从一个大小为 80GB 的文件开始,您将有 80 个文件。
更简洁的解决方案是使用 python 的 multiprocessing
模块(重要:不是 threading
!如果您不知道其中的区别,请查找“全局解释器”锁”以及为什么 python 中的多线程无法按照您认为的方式工作)。
我们的想法是让一个“生产者”进程打开大文件并不断地将其中的行放入队列中。然后,从队列中提取行并进行处理的“消费者”进程池。
优点:
- 一个脚本搞定一切
- 无需打开多个终端并进行输入
缺点:
- 复杂性
- 使用进程间通信,有一些开销
这将按如下方式实施:
# Libraries
import os
import multiprocessing
outputdirectory="sorted"
depth=4 # This is the tree depth
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
if __name__ == '__main__':
# Variables
file="80_gig_file.txt"
# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM
with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()
if __name__ == '__main__'
行是将 运行 在每个进程上的代码与 运行 仅在“父亲”上的代码分开的障碍。每个进程都定义了 pipeline
,但实际上只有父进程产生了一个工作池并应用了该函数。您会找到有关 multiprocessing.map
here
编辑:
添加了关闭和加入池以防止主进程退出并杀死进程中的子进程。