将大文件拆分成块
Splitting a large file into chunks
我有一个包含 7946479 条记录的文件,我想逐行读取文件并插入到数据库 (sqlite) 中。我的第一种方法是打开文件逐行读取记录并同时插入数据库,因为它处理大量数据需要很长时间 time.I 想改变这种天真的方法所以当我搜索时在互联网上我看到了这个 [python-csv-to-sqlite][1] ,他们在 csv 文件中有数据,但我的文件是 dat 格式但是我喜欢这个问题的答案,所以现在我正在尝试像在解决方案中那样做。
他们使用的方法就像是首先将整个文件分成块,然后进行数据库事务,而不是一次写入每条记录。
所以我开始编写将文件分成块的代码这是我的代码,
file = r'files/jan.dat'
test_file = r'random_test.txt'
def chunks(file_obj, size=10000):
counter = 0
file_chunks = []
temp_chunks = []
for line in file_obj:
if line == '\n':
continue
if counter != size:
temp_chunks.append(line)
counter += 1
else:
file_chunks.append(temp_chunks)
temp_chunks = []
counter = 0
file_obj.close()
if len(temp_chunks) != 0:
file_chunks.append(temp_chunks)
yield file_chunks
if __name__ == '__main__':
split_files = chunks(open(test_file))
for chunk in split_files:
print(len(chunk))
输出是 795,但我想要的是将整个文件拆分成大小为 10000 的块
我不知道这里出了什么问题,我不能在这里分享我的整个文件所以为了测试可以使用这段代码生成一个包含 7946479 行的文件
TEXT = 'Hello world'
FILE_LENGTH = 7946479
counter = 0
with open(r'random_test.txt', 'w') as f:
for _ in range(FILE_LENGTH):
f.write(f"{TEXT}\n")
这是我的原始文件的样子(文件格式是 dat)
lat lon day mon t2m rh2m sf ws
5 60 1 1 299.215 94.737 209.706 5.213
5 60.25 1 1 299.25 94.728 208.868 5.137
5 60.5 1 1 299.295 94.695 207.53 5.032
5 60.75 1 1 299.353 94.623 206.18 4.945
5 61 1 1 299.417 94.522 204.907 4.833
5 61.25 1 1 299.447 94.503 204.219 4.757
5 61.5 1 1 299.448 94.525 203.933 4.68
5 61.75 1 1 299.443 94.569 204.487 4.584
5 62 1 1 299.44 94.617 204.067 4.464
作为解决任务耗时过长问题的方法,我建议使用多处理而不是将文本分块(因为它会花费同样长的时间,但需要更多步骤)。使用 multiprocessing 库允许多个处理核心并行执行相同的任务,从而缩短 运行 时间。这是一个例子。
import multiprocessing as mp
# Step 1: Use multiprocessing.Pool() and specify number of cores to use (here I use 4).
pool = mp.Pool(4)
# Step 2: Use pool.starmap which takes a multiple iterable arguments
results = pool.starmap(My_Function, [(Parameter1, Parameter2, Parameter3) for i in data])
# Step 3: Don't forget to close
pool.close()
分块文件的一种简单方法是使用 f.read(size)
,直到没有内容为止。但是,此方法适用于字符编号而不是行。
test_file = 'random_test.txt'
def chunks(file_name, size=10000):
with open(file_name) as f:
while content := f.read(size):
yield content
if __name__ == '__main__':
split_files = chunks(test_file)
for chunk in split_files:
print(len(chunk))
对于最后一个块,它将占用剩下的所有内容,这里是 143
个字符
与行相同的功能
test_file = "random_test.txt"
def chunks(file_name, size=10000):
with open(file_name) as f:
while content := f.readline():
for _ in range(size - 1):
content += f.readline()
yield content.splitlines()
if __name__ == '__main__':
split_files = chunks(test_file)
for chunk in split_files:
print(len(chunk))
对于最后一个块,它将占用剩下的所有内容,此处 6479
行
test_file = r'random_test.txt'
def chunks(file_obj, size=10000):
counter, chunks = 0, []
for line in file_obj:
if line == '\n':
continue
counter += 1
chunks.append(line)
if counter == size:
yield chunks
counter, chunks = 0, []
file_obj.close()
if counter:
yield chunks
if __name__ == '__main__':
split_files = chunks(open(test_file))
for chunk in split_files:
print(len(chunk))
最后输出一吨 10000 和 6479。并不是说 yield
关键字在这里真的更合适,但它在您使用它的地方绝对没有用。 yield
有助于创建惰性迭代器:仅当我们请求时才会从文件中读取新块。这样我们就不会读取内存中的完整文件。
使用 pandas.read_csv
和 chunksize 参数
简单地读取它
chunks = pd.read_csv('jan.dat', sep='\s+', chunksize=1000)
for chunk in chunks:
# Process here
也可以使用pandas.DataFrame.to_sql推送到数据库
我有一个包含 7946479 条记录的文件,我想逐行读取文件并插入到数据库 (sqlite) 中。我的第一种方法是打开文件逐行读取记录并同时插入数据库,因为它处理大量数据需要很长时间 time.I 想改变这种天真的方法所以当我搜索时在互联网上我看到了这个 [python-csv-to-sqlite][1] ,他们在 csv 文件中有数据,但我的文件是 dat 格式但是我喜欢这个问题的答案,所以现在我正在尝试像在解决方案中那样做。
他们使用的方法就像是首先将整个文件分成块,然后进行数据库事务,而不是一次写入每条记录。
所以我开始编写将文件分成块的代码这是我的代码,
file = r'files/jan.dat'
test_file = r'random_test.txt'
def chunks(file_obj, size=10000):
counter = 0
file_chunks = []
temp_chunks = []
for line in file_obj:
if line == '\n':
continue
if counter != size:
temp_chunks.append(line)
counter += 1
else:
file_chunks.append(temp_chunks)
temp_chunks = []
counter = 0
file_obj.close()
if len(temp_chunks) != 0:
file_chunks.append(temp_chunks)
yield file_chunks
if __name__ == '__main__':
split_files = chunks(open(test_file))
for chunk in split_files:
print(len(chunk))
输出是 795,但我想要的是将整个文件拆分成大小为 10000 的块
我不知道这里出了什么问题,我不能在这里分享我的整个文件所以为了测试可以使用这段代码生成一个包含 7946479 行的文件
TEXT = 'Hello world'
FILE_LENGTH = 7946479
counter = 0
with open(r'random_test.txt', 'w') as f:
for _ in range(FILE_LENGTH):
f.write(f"{TEXT}\n")
这是我的原始文件的样子(文件格式是 dat)
lat lon day mon t2m rh2m sf ws
5 60 1 1 299.215 94.737 209.706 5.213
5 60.25 1 1 299.25 94.728 208.868 5.137
5 60.5 1 1 299.295 94.695 207.53 5.032
5 60.75 1 1 299.353 94.623 206.18 4.945
5 61 1 1 299.417 94.522 204.907 4.833
5 61.25 1 1 299.447 94.503 204.219 4.757
5 61.5 1 1 299.448 94.525 203.933 4.68
5 61.75 1 1 299.443 94.569 204.487 4.584
5 62 1 1 299.44 94.617 204.067 4.464
作为解决任务耗时过长问题的方法,我建议使用多处理而不是将文本分块(因为它会花费同样长的时间,但需要更多步骤)。使用 multiprocessing 库允许多个处理核心并行执行相同的任务,从而缩短 运行 时间。这是一个例子。
import multiprocessing as mp
# Step 1: Use multiprocessing.Pool() and specify number of cores to use (here I use 4).
pool = mp.Pool(4)
# Step 2: Use pool.starmap which takes a multiple iterable arguments
results = pool.starmap(My_Function, [(Parameter1, Parameter2, Parameter3) for i in data])
# Step 3: Don't forget to close
pool.close()
分块文件的一种简单方法是使用 f.read(size)
,直到没有内容为止。但是,此方法适用于字符编号而不是行。
test_file = 'random_test.txt'
def chunks(file_name, size=10000):
with open(file_name) as f:
while content := f.read(size):
yield content
if __name__ == '__main__':
split_files = chunks(test_file)
for chunk in split_files:
print(len(chunk))
对于最后一个块,它将占用剩下的所有内容,这里是 143
个字符
与行相同的功能
test_file = "random_test.txt"
def chunks(file_name, size=10000):
with open(file_name) as f:
while content := f.readline():
for _ in range(size - 1):
content += f.readline()
yield content.splitlines()
if __name__ == '__main__':
split_files = chunks(test_file)
for chunk in split_files:
print(len(chunk))
对于最后一个块,它将占用剩下的所有内容,此处 6479
行
test_file = r'random_test.txt'
def chunks(file_obj, size=10000):
counter, chunks = 0, []
for line in file_obj:
if line == '\n':
continue
counter += 1
chunks.append(line)
if counter == size:
yield chunks
counter, chunks = 0, []
file_obj.close()
if counter:
yield chunks
if __name__ == '__main__':
split_files = chunks(open(test_file))
for chunk in split_files:
print(len(chunk))
最后输出一吨 10000 和 6479。并不是说 yield
关键字在这里真的更合适,但它在您使用它的地方绝对没有用。 yield
有助于创建惰性迭代器:仅当我们请求时才会从文件中读取新块。这样我们就不会读取内存中的完整文件。
使用 pandas.read_csv
和 chunksize 参数
chunks = pd.read_csv('jan.dat', sep='\s+', chunksize=1000)
for chunk in chunks:
# Process here
也可以使用pandas.DataFrame.to_sql推送到数据库