将大文件拆分成块

Splitting a large file into chunks

我有一个包含 7946479 条记录的文件,我想逐行读取文件并插入到数据库 (sqlite) 中。我的第一种方法是打开文件逐行读取记录并同时插入数据库,因为它处理大量数据需要很长时间 time.I 想改变这种天真的方法所以当我搜索时在互联网上我看到了这个 [python-csv-to-sqlite][1] ,他们在 csv 文件中有数据,但我的文件是 dat 格式但是我喜欢这个问题的答案,所以现在我正在尝试像在解决方案中那样做。

他们使用的方法就像是首先将整个文件分成块,然后进行数据库事务,而不是一次写入每条记录。

所以我开始编写将文件分成块的代码这是我的代码,

file = r'files/jan.dat'
test_file = r'random_test.txt'


def chunks(file_obj, size=10000):
counter = 0
file_chunks = []
temp_chunks = []

for line in file_obj:
    if line == '\n':
        continue
    if counter != size:
        temp_chunks.append(line)
        counter += 1
    else:
        file_chunks.append(temp_chunks)
        temp_chunks = []
        counter = 0
file_obj.close()
if len(temp_chunks) != 0:
    file_chunks.append(temp_chunks)

yield file_chunks

if __name__ == '__main__':
    split_files = chunks(open(test_file))
    for chunk in split_files:
        print(len(chunk))

输出是 795,但我想要的是将整个文件拆分成大小为 10000 的块

我不知道这里出了什么问题,我不能在这里分享我的整个文件所以为了测试可以使用这段代码生成一个包含 7946479 行的文件

TEXT = 'Hello world'
FILE_LENGTH = 7946479

counter = 0
with open(r'random_test.txt', 'w') as f:
    for _ in range(FILE_LENGTH):
        f.write(f"{TEXT}\n")

这是我的原始文件的样子(文件格式是 dat

lat lon day mon t2m rh2m    sf  ws
5   60  1   1   299.215 94.737  209.706 5.213
5   60.25   1   1   299.25  94.728  208.868 5.137
5   60.5    1   1   299.295 94.695  207.53  5.032
5   60.75   1   1   299.353 94.623  206.18  4.945
5   61  1   1   299.417 94.522  204.907 4.833
5   61.25   1   1   299.447 94.503  204.219 4.757
5   61.5    1   1   299.448 94.525  203.933 4.68
5   61.75   1   1   299.443 94.569  204.487 4.584
5   62  1   1   299.44  94.617  204.067 4.464

作为解决任务耗时过长问题的方法,我建议使用多处理而不是将文本分块(因为它会花费同样长的时间,但需要更多步骤)。使用 multiprocessing 库允许多个处理核心并行执行相同的任务,从而缩短 运行 时间。这是一个例子。

import multiprocessing as mp

# Step 1: Use multiprocessing.Pool() and specify number of cores to use (here I use 4).
pool = mp.Pool(4)

# Step 2: Use pool.starmap which takes a multiple iterable arguments
results = pool.starmap(My_Function, [(Parameter1, Parameter2, Parameter3) for i in data])
    
# Step 3: Don't forget to close
pool.close()

分块文件的一种简单方法是使用 f.read(size),直到没有内容为止。但是,此方法适用于字符编号而不是行。

test_file = 'random_test.txt'


def chunks(file_name, size=10000):
    with open(file_name) as f:
        while content := f.read(size):
            yield content


if __name__ == '__main__':
    split_files = chunks(test_file)
    for chunk in split_files:
        print(len(chunk))

对于最后一个块,它将占用剩下的所有内容,这里是 143 个字符


与行相同的功能

test_file = "random_test.txt"


def chunks(file_name, size=10000):
    with open(file_name) as f:
        while content := f.readline():
            for _ in range(size - 1):
                content += f.readline()

            yield content.splitlines()


if __name__ == '__main__':
    split_files = chunks(test_file)

    for chunk in split_files:
        print(len(chunk))


对于最后一个块,它将占用剩下的所有内容,此处 6479

test_file = r'random_test.txt'

def chunks(file_obj, size=10000):
    counter, chunks = 0, []
    for line in file_obj:
        if line == '\n':
            continue
        counter += 1
        chunks.append(line)
        if counter == size:
            yield chunks
            counter, chunks = 0, []
    file_obj.close()
    if counter:
        yield chunks

if __name__ == '__main__':
    split_files = chunks(open(test_file))
    for chunk in split_files:
        print(len(chunk))

最后输出一吨 10000 和 6479。并不是说 yield 关键字在这里真的更合适,但它在您使用它的地方绝对没有用。 yield 有助于创建惰性迭代器:仅当我们请求时才会从文件中读取新块。这样我们就不会读取内存中的完整文件。

使用 pandas.read_csv 和 chunksize 参数

简单地读取它
chunks = pd.read_csv('jan.dat', sep='\s+', chunksize=1000)

for chunk in chunks:
    # Process here

也可以使用pandas.DataFrame.to_sql推送到数据库