读取大文本文件(~20m 行),将函数应用于行,写入新文本文件
Read in large text file (~20m rows), apply function to rows, write to new text file
我有一个非常大的文本文件,还有一个函数可以对每一行执行我希望它执行的操作。但是,逐行阅读并应用函数时,大约需要三个小时。我想知道是否有一种方法可以通过分块或多处理来加快速度。
我的代码如下所示:
with open('f.txt', 'r') as f:
function(f,w)
其中函数接收大文本文件和空文本文件并应用函数并写入空文件。
我试过:
def multiprocess(f,w):
cores = multiprocessing.cpu_count()
with Pool(cores) as p:
pieces = p.map(function,f,w)
f.close()
w.close()
multiprocess(f,w)
但是当我这样做时,我得到一个 TypeError <= unsupported operand with type 'io.TextWrapper' and 'int'。这也可能是错误的方法,或者我可能完全做错了。任何建议将不胜感激。
用in迭代不是办法,但是可以一次调用多行,只需要求和一个或多个就可以读多行,这样程序读起来会更快。
看这个片段。
# Python code to
# demonstrate readlines()
L = ["Geeks\n", "for\n", "Geeks\n"]
# writing to file
file1 = open('myfile.txt', 'w')
file1.writelines(L)
file1.close()
# Using readlines()
file1 = open('myfile.txt', 'r')
Lines = file1.readlines()
count = 0
# Strips the newline character
for line in Lines:
count += 1
print("Line{}: {}".format(count, line.strip()))
我从:https://www.geeksforgeeks.org/read-a-file-line-by-line-in-python/.
即使您可以成功地将打开的文件对象作为参数 f
和 w
传递给池中的子 OS 进程(我认为您不能在任何 OS) 尝试同时读取和写入文件至少可以说是个坏主意。
一般来说,我建议使用Process class而不是Pool,假设输出最终结果需要与输入的20m行文件保持相同的顺序。
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
最慢的解决方案,但 RAM 使用效率最高
- 您逐行执行和处理文件的初始解决方案
为了最大速度,但 RAM 消耗最多
- 通过
f.readlines()
将整个文件作为列表读入RAM,如果你的整个数据集可以放入内存,轻松
- 计算核心数(例如 8 个核心)
- 将列表平均分成 8 个列表
- 将每个列表传递给要由 Process 实例执行的函数(此时您的 RAM 使用率将进一步加倍,这是最大速度的折衷),但您应该
del
原来的释放一些 RAM 之后的大列表
- 每个进程按顺序逐行处理其整个块,并将其写入自己的输出文件(out_file1.txt、out_file2.txt 等)
- 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用
subprocess.run('cat out_file* > big_output.txt')
,或者 windows. 的等效 Windows 命令
对于速度和 RAM 之间的中间权衡,但最复杂的是,我们将不得不使用 Queue
class
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
- 找出变量中的核心数
cores
(比如 8)
- 初始化8个队列,8个进程,将每个Queue传递给每个进程。此时每个进程应该打开自己的输出文件(outfile1.txt、outfile2.txt 等)
- 每个进程应轮询(并阻止)一大块 10_000 行,处理它们,并将它们按顺序写入各自的输出文件
- 在父进程的循环中,从输入的 20m 行文件中读取
10_000 * 8
行
- 将其分成几个列表(10K 块)以推送到您各自的进程队列
- 当您完成 20m 行后退出循环,将一个特殊值传递到每个进程队列中,以指示输入数据结束
- 当每个进程在其自己的队列中检测到特殊的数据结束值时,每个进程都应关闭其输出文件并退出
- 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用
subprocess.run('cat out_file* > big_output.txt')
,或者 windows. 的等效 Windows 命令
复杂?好吧,这通常是速度、RAM、复杂性之间的权衡。同样对于 20m 行的任务,需要确保数据处理尽可能最佳 - 尽可能多地内联函数,避免大量数学运算,尽可能在子进程中使用 Pandas / numpy 等。
我有一个非常大的文本文件,还有一个函数可以对每一行执行我希望它执行的操作。但是,逐行阅读并应用函数时,大约需要三个小时。我想知道是否有一种方法可以通过分块或多处理来加快速度。
我的代码如下所示:
with open('f.txt', 'r') as f:
function(f,w)
其中函数接收大文本文件和空文本文件并应用函数并写入空文件。
我试过:
def multiprocess(f,w):
cores = multiprocessing.cpu_count()
with Pool(cores) as p:
pieces = p.map(function,f,w)
f.close()
w.close()
multiprocess(f,w)
但是当我这样做时,我得到一个 TypeError <= unsupported operand with type 'io.TextWrapper' and 'int'。这也可能是错误的方法,或者我可能完全做错了。任何建议将不胜感激。
用in迭代不是办法,但是可以一次调用多行,只需要求和一个或多个就可以读多行,这样程序读起来会更快。
看这个片段。
# Python code to
# demonstrate readlines()
L = ["Geeks\n", "for\n", "Geeks\n"]
# writing to file
file1 = open('myfile.txt', 'w')
file1.writelines(L)
file1.close()
# Using readlines()
file1 = open('myfile.txt', 'r')
Lines = file1.readlines()
count = 0
# Strips the newline character
for line in Lines:
count += 1
print("Line{}: {}".format(count, line.strip()))
我从:https://www.geeksforgeeks.org/read-a-file-line-by-line-in-python/.
即使您可以成功地将打开的文件对象作为参数 f
和 w
传递给池中的子 OS 进程(我认为您不能在任何 OS) 尝试同时读取和写入文件至少可以说是个坏主意。
一般来说,我建议使用Process class而不是Pool,假设输出最终结果需要与输入的20m行文件保持相同的顺序。
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process
最慢的解决方案,但 RAM 使用效率最高
- 您逐行执行和处理文件的初始解决方案
为了最大速度,但 RAM 消耗最多
- 通过
f.readlines()
将整个文件作为列表读入RAM,如果你的整个数据集可以放入内存,轻松 - 计算核心数(例如 8 个核心)
- 将列表平均分成 8 个列表
- 将每个列表传递给要由 Process 实例执行的函数(此时您的 RAM 使用率将进一步加倍,这是最大速度的折衷),但您应该
del
原来的释放一些 RAM 之后的大列表 - 每个进程按顺序逐行处理其整个块,并将其写入自己的输出文件(out_file1.txt、out_file2.txt 等)
- 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用
subprocess.run('cat out_file* > big_output.txt')
,或者 windows. 的等效 Windows 命令
对于速度和 RAM 之间的中间权衡,但最复杂的是,我们将不得不使用 Queue
class
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
- 找出变量中的核心数
cores
(比如 8) - 初始化8个队列,8个进程,将每个Queue传递给每个进程。此时每个进程应该打开自己的输出文件(outfile1.txt、outfile2.txt 等)
- 每个进程应轮询(并阻止)一大块 10_000 行,处理它们,并将它们按顺序写入各自的输出文件
- 在父进程的循环中,从输入的 20m 行文件中读取
10_000 * 8
行 - 将其分成几个列表(10K 块)以推送到您各自的进程队列
- 当您完成 20m 行后退出循环,将一个特殊值传递到每个进程队列中,以指示输入数据结束
- 当每个进程在其自己的队列中检测到特殊的数据结束值时,每个进程都应关闭其输出文件并退出
- 让您的 OS 将您的输出文件按顺序连接成一个大输出文件。如果您是 运行 UNIX 系统,则可以使用
subprocess.run('cat out_file* > big_output.txt')
,或者 windows. 的等效 Windows 命令
复杂?好吧,这通常是速度、RAM、复杂性之间的权衡。同样对于 20m 行的任务,需要确保数据处理尽可能最佳 - 尽可能多地内联函数,避免大量数学运算,尽可能在子进程中使用 Pandas / numpy 等。