在 python 中批处理非常大的文本文件
Batching very large text file in python
我正在尝试将一个非常大的文本文件(大约 150 GB)批处理成几个较小的文本文件(大约 10 GB)。
我的大致流程是:
# iterate over file one line at a time
# accumulate batch as string
--> # given a certain count that correlates to the size of my current accumulated batch and when that size is met: (this is where I am unsure)
# write to file
# accumulate size count
我有一个粗略的指标来计算何时进行批处理(当达到所需的批处理大小时),但我不太清楚我应该如何计算给定的写入磁盘的频率批。例如,如果我的批处理大小是 10 GB,我假设我将需要迭代写入而不是将整个 10 GB 的批处理保存在内存中。我显然不想写太多,因为这可能非常昂贵。
您是否有任何粗略的计算或技巧可以用来确定何时写入磁盘以完成诸如此类的任务,例如大小与内存之类的?
这里是一个逐行写入的例子。它以二进制模式打开以避免行解码步骤,该步骤花费适度的时间但可能会影响字符计数。例如,utf-8 编码可能使用磁盘上的多个字节来表示单个 python 字符。
4 Meg 是缓冲的猜测。这个想法是让操作系统一次读取更多文件,减少查找时间。这是否有效或使用的最佳数字是有争议的 - 并且对于不同的操作系统会有所不同。我发现 4 兆有所不同...但那是几年前的事了,情况发生了变化。
outfile_template = "outfile-{}.txt"
infile_name = "infile.txt"
chunksize = 10_000_000_000
MEB = 2**20 # mebibyte
count = 0
byteswritten = 0
infile = open(infile_name, "rb", buffering=4*MEB)
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
try:
for line in infile:
if byteswritten > chunksize:
outfile.close()
byteswritten = 0
count += 1
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
outfile.write(line)
byteswritten += len(line)
finally:
infile.close()
outfile.close()
我使用稍微修改过的版本来解析 250GB json,我选择我需要多少个小文件 number_of_slices
然后我找到文件切片的位置(我总是寻找行结尾)。最后我用 file.seek
和 file.read(chunk)
切片文件
import os
import mmap
FULL_PATH_TO_FILE = 'full_path_to_a_big_file'
OUTPUT_PATH = 'full_path_to_a_output_dir' # where sliced files will be generated
def next_newline_finder(mmapf):
def nl_find(mmapf):
while 1:
current = hex(mmapf.read_byte())
if hex(ord('\n')) == current: # or whatever line-end symbol
return(mmapf.tell())
return nl_find(mmapf)
# find positions where to slice a file
file_info = os.stat(FULL_PATH_TO_FILE)
file_size = file_info.st_size
positions_for_file_slice = [0]
number_of_slices = 15 # say u want slice the big file to 15 smaller files
size_per_slice = file_size//number_of_slices
with open(FULL_PATH_TO_FILE, "r+b") as f:
mmapf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
slice_counter = 1
while slice_counter < number_of_slices:
pos = size_per_slice*slice_counter
mmapf.seek(pos)
newline_pos = next_newline_finder(mmapf)
positions_for_file_slice.append(newline_pos)
slice_counter += 1
# create ranges for found positions (from, to)
positions_for_file_slice = [(pos, positions_for_file_slice[i+1]) if i < (len(positions_for_file_slice)-1) else (
positions_for_file_slice[i], file_size) for i, pos in enumerate(positions_for_file_slice)]
# do actual slice of a file
with open(FULL_PATH_TO_FILE, "rb") as f:
for i, position_pair in enumerate(positions_for_file_slice):
read_from, read_to = position_pair
f.seek(read_from)
chunk = f.read(read_to-read_from)
with open(os.path.join(OUTPUT_PATH, f'dummyfile{i}.json'), 'wb') as chunk_file:
chunk_file.write(chunk)
假设您的大文件是简单的非结构化文本,即这对像 JSON 这样的结构化文本没有好处,这里有一个读取每一行的替代方法:读取输入文件的大二进制位直到达到您的块大小然后阅读几行,关闭当前输出文件并继续下一个。
我将此与使用@tdelaney 代码逐行进行了比较,该代码采用与我的代码相同的块大小 - 该代码花费了 250 秒将 12GiB 输入文件拆分为 6x2GiB 块,而这花费了大约 50 秒,所以可能是五次更快,看起来它 I/O 绑定在我的 SSD 运行 >200MiB/s 读写上,其中逐行是 运行 40-50MiB/s 读写。
我关闭了缓冲,因为没有太多意义。 bite 的大小和缓冲设置可以调整以提高性能,还没有尝试任何其他设置,因为对我来说它似乎是 I/O 无论如何。
import time
outfile_template = "outfile-{}.txt"
infile_name = "large.text"
chunksize = 2_000_000_000
MEB = 2**20 # mebibyte
bitesize = 4_000_000 # the size of the reads (and writes) working up to chunksize
count = 0
starttime = time.perf_counter()
infile = open(infile_name, "rb", buffering=0)
outfile = open(outfile_template.format(count), "wb", buffering=0)
while True:
byteswritten = 0
while byteswritten < chunksize:
bite = infile.read(bitesize)
# check for EOF
if not bite:
break
outfile.write(bite)
byteswritten += len(bite)
# check for EOF
if not bite:
break
for i in range(2):
l = infile.readline()
# check for EOF
if not l:
break
outfile.write(l)
# check for EOF
if not l:
break
outfile.close()
count += 1
print( count )
outfile = open(outfile_template.format(count), "wb", buffering=0)
outfile.close()
infile.close()
endtime = time.perf_counter()
elapsed = endtime-starttime
print( f"Elapsed= {elapsed}" )
注意我没有详尽地测试它不会丢失数据,尽管没有证据表明它确实丢失了任何东西你应该自己验证。
通过检查块末尾的时间以查看还有多少数据要读取,可能有助于增加一些稳健性,这样您就不会以最后一个输出文件的长度为 0(或更短)而告终比 bitesize)
HTH
巴尼
我正在尝试将一个非常大的文本文件(大约 150 GB)批处理成几个较小的文本文件(大约 10 GB)。
我的大致流程是:
# iterate over file one line at a time
# accumulate batch as string
--> # given a certain count that correlates to the size of my current accumulated batch and when that size is met: (this is where I am unsure)
# write to file
# accumulate size count
我有一个粗略的指标来计算何时进行批处理(当达到所需的批处理大小时),但我不太清楚我应该如何计算给定的写入磁盘的频率批。例如,如果我的批处理大小是 10 GB,我假设我将需要迭代写入而不是将整个 10 GB 的批处理保存在内存中。我显然不想写太多,因为这可能非常昂贵。
您是否有任何粗略的计算或技巧可以用来确定何时写入磁盘以完成诸如此类的任务,例如大小与内存之类的?
这里是一个逐行写入的例子。它以二进制模式打开以避免行解码步骤,该步骤花费适度的时间但可能会影响字符计数。例如,utf-8 编码可能使用磁盘上的多个字节来表示单个 python 字符。
4 Meg 是缓冲的猜测。这个想法是让操作系统一次读取更多文件,减少查找时间。这是否有效或使用的最佳数字是有争议的 - 并且对于不同的操作系统会有所不同。我发现 4 兆有所不同...但那是几年前的事了,情况发生了变化。
outfile_template = "outfile-{}.txt"
infile_name = "infile.txt"
chunksize = 10_000_000_000
MEB = 2**20 # mebibyte
count = 0
byteswritten = 0
infile = open(infile_name, "rb", buffering=4*MEB)
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
try:
for line in infile:
if byteswritten > chunksize:
outfile.close()
byteswritten = 0
count += 1
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
outfile.write(line)
byteswritten += len(line)
finally:
infile.close()
outfile.close()
我使用稍微修改过的版本来解析 250GB json,我选择我需要多少个小文件 number_of_slices
然后我找到文件切片的位置(我总是寻找行结尾)。最后我用 file.seek
和 file.read(chunk)
import os
import mmap
FULL_PATH_TO_FILE = 'full_path_to_a_big_file'
OUTPUT_PATH = 'full_path_to_a_output_dir' # where sliced files will be generated
def next_newline_finder(mmapf):
def nl_find(mmapf):
while 1:
current = hex(mmapf.read_byte())
if hex(ord('\n')) == current: # or whatever line-end symbol
return(mmapf.tell())
return nl_find(mmapf)
# find positions where to slice a file
file_info = os.stat(FULL_PATH_TO_FILE)
file_size = file_info.st_size
positions_for_file_slice = [0]
number_of_slices = 15 # say u want slice the big file to 15 smaller files
size_per_slice = file_size//number_of_slices
with open(FULL_PATH_TO_FILE, "r+b") as f:
mmapf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
slice_counter = 1
while slice_counter < number_of_slices:
pos = size_per_slice*slice_counter
mmapf.seek(pos)
newline_pos = next_newline_finder(mmapf)
positions_for_file_slice.append(newline_pos)
slice_counter += 1
# create ranges for found positions (from, to)
positions_for_file_slice = [(pos, positions_for_file_slice[i+1]) if i < (len(positions_for_file_slice)-1) else (
positions_for_file_slice[i], file_size) for i, pos in enumerate(positions_for_file_slice)]
# do actual slice of a file
with open(FULL_PATH_TO_FILE, "rb") as f:
for i, position_pair in enumerate(positions_for_file_slice):
read_from, read_to = position_pair
f.seek(read_from)
chunk = f.read(read_to-read_from)
with open(os.path.join(OUTPUT_PATH, f'dummyfile{i}.json'), 'wb') as chunk_file:
chunk_file.write(chunk)
假设您的大文件是简单的非结构化文本,即这对像 JSON 这样的结构化文本没有好处,这里有一个读取每一行的替代方法:读取输入文件的大二进制位直到达到您的块大小然后阅读几行,关闭当前输出文件并继续下一个。
我将此与使用@tdelaney 代码逐行进行了比较,该代码采用与我的代码相同的块大小 - 该代码花费了 250 秒将 12GiB 输入文件拆分为 6x2GiB 块,而这花费了大约 50 秒,所以可能是五次更快,看起来它 I/O 绑定在我的 SSD 运行 >200MiB/s 读写上,其中逐行是 运行 40-50MiB/s 读写。
我关闭了缓冲,因为没有太多意义。 bite 的大小和缓冲设置可以调整以提高性能,还没有尝试任何其他设置,因为对我来说它似乎是 I/O 无论如何。
import time
outfile_template = "outfile-{}.txt"
infile_name = "large.text"
chunksize = 2_000_000_000
MEB = 2**20 # mebibyte
bitesize = 4_000_000 # the size of the reads (and writes) working up to chunksize
count = 0
starttime = time.perf_counter()
infile = open(infile_name, "rb", buffering=0)
outfile = open(outfile_template.format(count), "wb", buffering=0)
while True:
byteswritten = 0
while byteswritten < chunksize:
bite = infile.read(bitesize)
# check for EOF
if not bite:
break
outfile.write(bite)
byteswritten += len(bite)
# check for EOF
if not bite:
break
for i in range(2):
l = infile.readline()
# check for EOF
if not l:
break
outfile.write(l)
# check for EOF
if not l:
break
outfile.close()
count += 1
print( count )
outfile = open(outfile_template.format(count), "wb", buffering=0)
outfile.close()
infile.close()
endtime = time.perf_counter()
elapsed = endtime-starttime
print( f"Elapsed= {elapsed}" )
注意我没有详尽地测试它不会丢失数据,尽管没有证据表明它确实丢失了任何东西你应该自己验证。
通过检查块末尾的时间以查看还有多少数据要读取,可能有助于增加一些稳健性,这样您就不会以最后一个输出文件的长度为 0(或更短)而告终比 bitesize)
HTH 巴尼