使用 Python 预处理后将大型 .txt 文件(大小 >30GB).txt 转换为 .csv 的最有效方法

Most efficient way to convert large .txt files (size >30GB) .txt into .csv after pre-processing using Python

我在 .txt 文件中有数据,如下所示(我们将其命名为“myfile.txt”):

28807644'~'0'~'Maun FCU'~'US#@#@#28855353'~'0'~'WNB Holdings LLC'~'US#@#@#29212330'~'0'~'Idaho First Bank'~'US#@#@#29278777'~'0'~'Republic Bank of Arizona'~'US#@#@#29633181'~'0'~'Friendly Hills Bank'~'US#@#@#29760145'~'0'~'The Freedom Bank of Virginia'~'US#@#@#100504846'~'0'~'Community First Fund Federal Credit Union'~'US#@#@#

我尝试了几种方法将此 .txt 转换为 .csv,其中之一是使用 CSV 库,但由于我非常喜欢 Panda,所以我使用了以下方法:

import pandas as pd
import time
  
#time at the start of program is noted
start = time.time()

# We set the path where our file is located and read it
path = r'myfile.txt'
f =  open(path, 'r')
content = f.read()
# We replace undesired strings and introduce a breakline.
content_filtered = content.replace("#@#@#", "\n").replace("'", "")
# We read everything in columns with the separator "~" 
df = pd.DataFrame([x.split('~') for x in content_filtered.split('\n')], columns = ['a', 'b', 'c', 'd'])
# We print the dataframe into a csv
df.to_csv(path.replace('.txt', '.csv'), index = None)
end = time.time()
  
#total time taken to print the file
print("Execution time in seconds: ",(end - start))

这大约需要 35 秒来处理,是一个 300MB 的文件,我可以接受这种性能,但我正在尝试对一个更大的文件执行相同的操作,该文件的大小为 35GB,它会产生内存错误消息。

我尝试使用 CSV 库,但结果相似,我尝试将所有内容放入列表,然后将其写入 CSV:

import csv
# We write to CSV
with open(path.replace('.txt', '.csv'), "w") as outfile:
    write = csv.writer(outfile)
    write.writerows(split_content)

结果相似,改进不大。有没有一种方法或方法可用于将非常大的 .txt 文件转换为 .csv?可能超过 35GB?

我很乐意阅读您的任何建议,提前致谢!

由于您的代码只是直接进行替换,您可以按顺序读取所有数据并检测需要替换的部分:

def process(fn_in, fn_out, columns):
    new_line = b'#@#@#'
    with open(fn_out, 'wb') as f_out:
        # write the header
        f_out.write((','.join(columns)+'\n').encode())
        i = 0
        with open(fn_in, "rb") as f_in:
            while (b := f_in.read(1)):
                if ord(b) == new_line[i]:
                    # keep matching the newline block
                    i += 1
                    if i == len(new_line):
                        # if matched entirely, write just a newline
                        f_out.write(b'\n')
                        i = 0
                    # write nothing while matching
                    continue
                elif i > 0:
                    # if you reach this, it was a partial match, write it
                    f_out.write(new_line[:i])
                    i = 0
                if b == b"'":
                    pass
                elif b == b"~":
                    f_out.write(b',')
                else:
                    # write the byte if no match
                    f_out.write(b)


process('my_file.txt', 'out.csv', ['a', 'b', 'c', 'd'])

这样做很快。您可以通过分块读取来提高性能,但这仍然非常快。

与您的方法相比,此方法的优势在于它在内存中几乎不保留任何内容,但它对优化快速读取文件的作用很小。

编辑:在边缘情况下有一个很大的错误,我重新阅读后才意识到,现在已修复。

我拿了你的示例字符串,并通过将该字符串乘以 1 亿(类似于 your_string*1e8...)制作了一个示例文件,得到一个 31GB 的测试文件。

按照@Grismar 的分块建议,我做了以下处理,它在 ~2 分钟 内处理了那个 31GB 的文件,峰值RAM 使用量取决于块大小。

复杂的部分是跟踪字段和记录分隔符,它们是多个字符,肯定会跨越一个块,因此会被截断。

我的解决方案是检查每个块的末尾,看看它是否有部分分隔符。如果是,则 partial 从当前块的末尾删除,当前块被写出,部分成为下一个(并且应该由其完成)的开始块:

CHUNK_SZ = 1024 * 1024

FS = "'~'"
RS = '#@#@#'

# With chars repeated in the separators, check most specific (least ambiguous)
# to least specific (most ambiguous) to definitively catch a partial with the
# fewest number of checks
PARTIAL_RSES = ['#@#@', '#@#', '#@', '#']
PARTIAL_FSES = ["'~", "'"]
ALL_PARTIALS =  PARTIAL_FSES + PARTIAL_RSES 

f_out = open('out.csv', 'w')
f_out.write('a,b,c,d\n')

f_in = open('my_file.txt')
line = ''
while True:
    # Read chunks till no more, then break out
    chunk = f_in.read(CHUNK_SZ)
    if not chunk:
        break

    # Any previous partial separator, plus new chunk
    line += chunk

    # Check end-of-line for a partial FS or RS; only when separators are more than one char
    final_partial = ''

    if line.endswith(FS) or line.endswith(RS):
        pass  # Write-out will replace complete FS or RS
    else:
        for partial in ALL_PARTIALS:
            if line.endswith(partial):
                final_partial = partial
                line = line[:-len(partial)]
                break

    # Process/write chunk
    f_out.write(line
                .replace(FS, ',')
                .replace(RS, '\n'))

    # Add partial back, to be completed next chunk
    line = final_partial


# Clean up
f_in.close()
f_out.close()