读取大文本文件(~20m 行),将函数应用于行,写入新文本文件

Read in large text file (~20m rows), apply function to rows, write to new text file

我有一个非常大的文本文件,还有一个函数可以对每一行执行我希望它执行的操作。但是,逐行阅读并应用函数时,大约需要三个小时。我想知道是否有一种方法可以通过分块或多处理来加快速度。

我的代码如下所示:

with open('f.txt', 'r') as f:
    function(f,w)

其中函数接收大文本文件和空文本文件并应用函数并写入空文件。

我试过:

def multiprocess(f,w):    
    cores = multiprocessing.cpu_count()

    with Pool(cores) as p:
        pieces = p.map(function,f,w)
    
    f.close()
    w.close()

multiprocess(f,w)

但是当我这样做时,我得到一个 TypeError <= unsupported operand with type 'io.TextWrapper' and 'int'。这也可能是错误的方法,或者我可能完全做错了。任何建议将不胜感激。

用in迭代不是办法,但是可以一次调用多行,只需要求和一个或多个就可以读多行,这样程序读起来会更快。

看这个片段。

# Python code to
# demonstrate readlines()
 
L = ["Geeks\n", "for\n", "Geeks\n"]
 
# writing to file
file1 = open('myfile.txt', 'w')
file1.writelines(L)
file1.close()
 
# Using readlines()
file1 = open('myfile.txt', 'r')
Lines = file1.readlines()
 
count = 0
# Strips the newline character
for line in Lines:
    count += 1
    print("Line{}: {}".format(count, line.strip()))

我从:https://www.geeksforgeeks.org/read-a-file-line-by-line-in-python/.

即使您可以成功地将打开的文件对象作为参数 fw 传递给池中的子 OS 进程(我认为您不能在任何 OS) 尝试同时读取和写入文件至少可以说是个坏主意。

一般来说,我建议使用Process class而不是Pool,假设输出最终结果需要与输入的20m行文件保持相同的顺序。

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process

最慢的解决方案,但 RAM 使用效率最高

  • 您逐行执行和处理文件的初始解决方案

为了最大速度,但 RAM 消耗最多

  • 通过f.readlines()将整个文件作为列表读入RAM,如果你的整个数据集可以放入内存,轻松
  • 计算核心数(例如 8 个核心)
  • 将列表平均分成 8 个列表
  • 将每个列表传递给要由 Process 实例执行的函数(此时您的 RAM 使用率将进一步加倍,这是最大速度的折衷),但您应该 del 原来的释放一些 RAM 之后的大列表
  • 每个进程按顺序逐行处理其整个块,并将其写入自己的输出文件(out_file1.txt、out_file2.txt 等)
  • 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用 subprocess.run('cat out_file* > big_output.txt'),或者 windows.
  • 的等效 Windows 命令

对于速度和 RAM 之间的中间权衡,但最复杂的是,我们将不得不使用 Queue class

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue

  • 找出变量中的核心数 cores(比如 8)
  • 初始化8个队列,8个进程,将每个Queue传递给每个进程。此时每个进程应该打开自己的输出文件(outfile1.txt、outfile2.txt 等)
  • 每个进程应轮询(并阻止)一大块 10_000 行,处理它们,并将它们按顺序写入各自的输出文件
  • 在父进程的循环中,从输入的 20m 行文件中读取 10_000 * 8
  • 将其分成几个列表(10K 块)以推送到您各自的进程队列
  • 当您完成 20m 行后退出循环,将一个特殊值传递到每个进程队列中,以指示输入数据结束
  • 当每个进程在其自己的队列中检测到特殊的数据结束值时,每个进程都应关闭其输出文件并退出
  • 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用 subprocess.run('cat out_file* > big_output.txt'),或者 windows.
  • 的等效 Windows 命令

复杂?好吧,这通常是速度、RAM、复杂性之间的权衡。同样对于 20m 行的任务,需要确保数据处理尽可能最佳 - 尽可能多地内联函数,避免大量数学运算,尽可能在子进程中使用 Pandas / numpy 等。