将 CSV 文件分成相等的部分?
Splitting a CSV file into equal parts?
我有一个很大的 CSV 文件,我想将其拆分为与系统中的 CPU 个核心数相等的数字。然后我想使用多进程让所有核心一起处理文件。但是,我什至无法将文件分成几部分。我仔细查看了 google,发现了一些示例代码似乎可以满足我的要求。这是我目前所拥有的:
def split(infilename, num_cpus=multiprocessing.cpu_count()):
READ_BUFFER = 2**13
total_file_size = os.path.getsize(infilename)
print total_file_size
files = list()
with open(infilename, 'rb') as infile:
for i in xrange(num_cpus):
files.append(tempfile.TemporaryFile())
this_file_size = 0
while this_file_size < 1.0 * total_file_size / num_cpus:
files[-1].write(infile.read(READ_BUFFER))
this_file_size += READ_BUFFER
files[-1].write(infile.readline()) # get the possible remainder
files[-1].seek(0, 0)
return files
files = split("sample_simple.csv")
print len(files)
for ifile in files:
reader = csv.reader(ifile)
for row in reader:
print row
这两个打印件显示了正确的文件大小,并且它被分成了 4 个部分(我的系统有 4 个 CPU 核心)。
但是,打印每一部分中所有行的代码的最后一部分给出了错误:
for row in reader:
_csv.Error: line contains NULL byte
我尝试在没有 运行 拆分函数的情况下打印行,它正确打印了所有值。我怀疑拆分函数向生成的 4 个文件片段添加了一些 NULL 字节,但我不确定为什么。
有谁知道这种拆分文件的方法是否正确且快速?我只想要 csv.reader.
可以成功读取的结果片段
正如我在评论中所说,csv 文件需要按行(或行)边界拆分。您的代码不会这样做,并且可能会在一个中间的某个地方将它们分解 — 我怀疑这是您 _csv.Error
.
的原因
以下通过将输入文件处理为一系列行来避免这样做。我已经测试过它,它似乎可以独立工作,因为它将示例文件分成 大约 个大小相等的块,因为整个行数不太可能完全适合一个大块。
更新
这是比我最初发布的代码显着 更快的版本。改进是因为它现在使用临时文件自己的 tell()
方法来确定文件在写入时不断变化的长度,而不是调用 os.path.getsize()
,这样就不需要 flush()
文件并在写入每一行后对其调用 os.fsync()
。
import csv
import multiprocessing
import os
import tempfile
def split(infilename, num_chunks=multiprocessing.cpu_count()):
READ_BUFFER = 2**13
in_file_size = os.path.getsize(infilename)
print 'in_file_size:', in_file_size
chunk_size = in_file_size // num_chunks
print 'target chunk_size:', chunk_size
files = []
with open(infilename, 'rb', READ_BUFFER) as infile:
for _ in xrange(num_chunks):
temp_file = tempfile.TemporaryFile()
while temp_file.tell() < chunk_size:
try:
temp_file.write(infile.next())
except StopIteration: # end of infile
break
temp_file.seek(0) # rewind
files.append(temp_file)
return files
files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))
for i, ifile in enumerate(files, start=1):
print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
print 'contents of file {}:'.format(i)
reader = csv.reader(ifile)
for row in reader:
print row
print ''
我有一个很大的 CSV 文件,我想将其拆分为与系统中的 CPU 个核心数相等的数字。然后我想使用多进程让所有核心一起处理文件。但是,我什至无法将文件分成几部分。我仔细查看了 google,发现了一些示例代码似乎可以满足我的要求。这是我目前所拥有的:
def split(infilename, num_cpus=multiprocessing.cpu_count()):
READ_BUFFER = 2**13
total_file_size = os.path.getsize(infilename)
print total_file_size
files = list()
with open(infilename, 'rb') as infile:
for i in xrange(num_cpus):
files.append(tempfile.TemporaryFile())
this_file_size = 0
while this_file_size < 1.0 * total_file_size / num_cpus:
files[-1].write(infile.read(READ_BUFFER))
this_file_size += READ_BUFFER
files[-1].write(infile.readline()) # get the possible remainder
files[-1].seek(0, 0)
return files
files = split("sample_simple.csv")
print len(files)
for ifile in files:
reader = csv.reader(ifile)
for row in reader:
print row
这两个打印件显示了正确的文件大小,并且它被分成了 4 个部分(我的系统有 4 个 CPU 核心)。
但是,打印每一部分中所有行的代码的最后一部分给出了错误:
for row in reader:
_csv.Error: line contains NULL byte
我尝试在没有 运行 拆分函数的情况下打印行,它正确打印了所有值。我怀疑拆分函数向生成的 4 个文件片段添加了一些 NULL 字节,但我不确定为什么。
有谁知道这种拆分文件的方法是否正确且快速?我只想要 csv.reader.
可以成功读取的结果片段正如我在评论中所说,csv 文件需要按行(或行)边界拆分。您的代码不会这样做,并且可能会在一个中间的某个地方将它们分解 — 我怀疑这是您 _csv.Error
.
以下通过将输入文件处理为一系列行来避免这样做。我已经测试过它,它似乎可以独立工作,因为它将示例文件分成 大约 个大小相等的块,因为整个行数不太可能完全适合一个大块。
更新
这是比我最初发布的代码显着 更快的版本。改进是因为它现在使用临时文件自己的 tell()
方法来确定文件在写入时不断变化的长度,而不是调用 os.path.getsize()
,这样就不需要 flush()
文件并在写入每一行后对其调用 os.fsync()
。
import csv
import multiprocessing
import os
import tempfile
def split(infilename, num_chunks=multiprocessing.cpu_count()):
READ_BUFFER = 2**13
in_file_size = os.path.getsize(infilename)
print 'in_file_size:', in_file_size
chunk_size = in_file_size // num_chunks
print 'target chunk_size:', chunk_size
files = []
with open(infilename, 'rb', READ_BUFFER) as infile:
for _ in xrange(num_chunks):
temp_file = tempfile.TemporaryFile()
while temp_file.tell() < chunk_size:
try:
temp_file.write(infile.next())
except StopIteration: # end of infile
break
temp_file.seek(0) # rewind
files.append(temp_file)
return files
files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))
for i, ifile in enumerate(files, start=1):
print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
print 'contents of file {}:'.format(i)
reader = csv.reader(ifile)
for row in reader:
print row
print ''