读取和处理大文件时的多线程(内存太大)

Multithreading while reading and processing a huge file (too big for memory)

我有以下运行速度非常慢的代码。这是一个拆分大文件(80 gig)并将其放入树状文件夹结构以便快速查找的程序。我在代码中做了一些注释,以帮助您理解它。

# Libraries
import os


# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth


# Preperations
os.makedirs(outputdirectory)

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()


# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
    for line in infile:
        pipeline(line)

有没有办法让多线程工作?因为我自己尝试了一些我在网上找到的例子,它把所有的东西都放在内存中,导致我的电脑多次死机。

首先,(IMO)最简单的解决方案

如果这些行看起来完全独立,只需将文件分成 N 个块,将文件名作为程序参数传递给 open,并 运行 当前脚本的多个实例手动启动它们多个命令行。

优点:

  • 无需深入研究多进程、进程间通信等
  • 不需要过多改动代码

缺点:

  • 您需要对大文件进行预处理,将其分成多个块(尽管这将比您当前的执行时间快得多,因为您不会有每行打开-关闭的情况)
  • 您需要自己启动进程,为每个进程传递适当的文件名

这将实现为:

预处理:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
  chunk_id = 0
  next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
  while next_chunk:
    with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
      ofp.writelines(next_chunk)
    chunk_id += 1
    next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

来自readlines docs:

If the optional sizehint argument is present, instead of reading up to EOF, whole lines totalling approximately sizehint bytes (possibly after rounding up to an internal buffer size) are read.

这样做并不能确保所有块中的行数为偶数,但会使预处理速度更快,因为您是按块读取而不是逐行读取。根据需要调整块大小。 另外,请注意,通过使用 readlines 我们可以确定块之间不会有断行,但是由于函数 returns 是一个行列表,我们使用 writelines 将其写入我们的输出文件(相当于循环遍历列表和 ofp.write(line))。为了完整起见,请注意,您还可以连接内存中的所有字符串并仅调用一次 write(即执行 ofp.write(''.join(next_chunk))),这可能会给您带来一些(次要的)性能优势,支付(多)更高的 RAM 使用率。

主要脚本:

您唯一需要的修改是在最上面:

import sys
file=sys.argv[1]
... # rest of your script here

通过使用 argv,您可以将命令行参数传递给您的程序(在本例中为要打开的文件)。然后,只需 运行 您的脚本为:

python process_the_file.py big_file_0.txt

这将 运行 一个进程。打开多个终端和 运行 相同的命令,每个终端使用 big_file_N.txt,它们将彼此独立。

注意:我使用argv[1],因为对于所有程序,argv(即argv[0])的第一个值始终是程序名字.


然后,multiprocessing

虽然有效,但第一个解决方案并不十分优雅,特别是因为如果您从一个大小为 80GB 的文件开始,您将有 80 个文件。

更简洁的解决方案是使用 python 的 multiprocessing 模块(重要:不是 threading!如果您不知道其中的区别,请查找“全局解释器”锁”以及为什么 python 中的多线程无法按照您认为的方式工作)。

我们的想法是让一个“生产者”进程打开大文件并不断地将其中的行放入队列中。然后,从队列中提取行并进行处理的“消费者”进程池。

优点:

  • 一个脚本搞定一切
  • 无需打开多个终端并进行输入

缺点:

  • 复杂性
  • 使用进程间通信,有一些开销

这将按如下方式实施:

# Libraries
import os
import multiprocessing

outputdirectory="sorted"
depth=4 # This is the tree depth

# Process each line in the file
def pipeline(line):
    # Strip symbols from line
    line_stripped=''.join(e for e in line if e.isalnum())
    # Reverse the line
    line_stripped_reversed=line_stripped[::-1]
    file=outputdirectory
    # Create path location in folderbased tree
    for i in range(min((depth),len(line_stripped))):
        file=os.path.join(file,line_stripped_reversed[i])
    # Create folders if they don't exist
    os.makedirs(os.path.dirname(file), exist_ok=True)
    # Name the file, with "-file"
    file=file+"-file"
    # This is the operation that slows everything down. 
    # It opens, writes and closes a lot of small files. 
    # I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
    f = open(file, "a")
    f.write(line)
    f.close()

if __name__ == '__main__':
    # Variables
    file="80_gig_file.txt"

    # Preperations
    os.makedirs(outputdirectory)
    pool = multiprocessing.Pool() # by default, 1 process per CPU
    LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM

    with open(file) as infile:
        next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
        pool.close()
        pool.join()

if __name__ == '__main__' 行是将 运行 在每个进程上的代码与 运行 仅在“父亲”上的代码分开的障碍。每个进程都定义了 pipeline,但实际上只有父进程产生了一个工作池并应用了该函数。您会找到有关 multiprocessing.map here

的更多详细信息

编辑:

添加了关闭和加入池以防止主进程退出并杀死进程中的子进程。