在 Python 中组合异构 CSV 文件的最快且 I/O 有效的方法
Fastest & I/O efficient way to combine heterogeneous CSV files in Python
给定十个 1MB 的 csv 文件,每个文件的布局略有不同,我需要将它们组合成一个具有相同 header 的规范化单个文件。空字符串适用于空值。
列示例:
1. FIELD1, FIELD2, FIELD3
2. FIELD2, FIELD1, FIELD3
3. FIELD1, FIELD3, FIELD4
4. FIELD3, FIELD4, FIELD5, FIELD6
5. FIELD2
输出看起来像(虽然顺序不重要,但我的代码按发现的顺序排列它们):
FIELD1, FIELD2, FIELD3, FIELD4, FIELD5, FIELD6
所以基本上字段可以按任何顺序出现,字段可能会丢失,或者以前没有见过的新字段。所有这些都必须包含在输出文件中。无需连接,最终部分中的数据行数必须等于输出中的行数。
将10MB全部读入内存即可。不知何故使用 100MB 来做它不会。如果需要,您也可以一次打开所有文件。很多文件手,可用内存,但它会 运行ning 针对 NAS,因此它需要高效(没有太多 NAS 操作)。
我现在的方法是将每个文件读入列列表,在发现新列时构建新的列列表,然后将其全部写到一个文件中。不过,我希望有人有更聪明的东西,因为我在这个过程中遇到了瓶颈,所以任何缓解都是有帮助的。
如果有人想尝试,我有示例文件 here。我将 post 我当前的代码作为可能的答案。寻找最快的时间,当我 运行 在我的服务器上(很多内核,很多内存)使用本地磁盘时。
它不是超短之类的,但基本上我是将它们读入列存储然后将它们全部写出。我希望速度更快,速度相同,相同 i/o 并且内存更少也很好......但速度更快是最重要的。
import csv
from os.path import join
from collections import OrderedDict
# Accumulators
#columnstore = OrderedDict of tuples ( Data List, Starting rowcount)
columnstore = OrderedDict()
total_rowcount = 0
def flush_to_merged_csv(merged_filename,delimiter):
with open(merged_filename,'w') as f:
writer = csv.writer(f, delimiter=bytes(delimiter) )
# Write the header first for all columns
writer.writerow(columnstore.keys())
# Write each row
for rowidx in range(0,total_rowcount):
# Assemble row from columnstore
row = []
for col in columnstore.keys():
if columnstore[col][1] <= rowidx:
row.append(columnstore[col][0][rowidx - columnstore[col][1]])
else:
row.append('')
writer.writerow(row)
def combine(location, files, mergefile, delimiter):
global total_rowcount
for filename in files:
with open(join(location,filename),'rb') as f:
file_rowcount = 0
reader = csv.reader( f, delimiter=bytes(delimiter) )
# Get the column names.
# Normalize the names (all upper, strip)
columns = [ x.strip().upper() for x in reader.next() ]
# Columnstore maintenance. Add new columns to columnstore
for col in columns:
if not columnstore.has_key(col):
columnstore[col] = ( [], total_rowcount )
# Loop throught the remaining file, adding each cell to the proper columnstore
for row in reader:
field_count = len(row)
total_rowcount += 1
# Add the columns that exist to the columnstore.
for columnidx in range(0,len(columns)):
# Handle missing trailing fields as empty
if columnidx >= field_count:
columnstore[columns[columnidx]][0].append('')
else:
columnstore[columns[columnidx]][0].append(row[columnidx])
# Add emptry strings to any columnstores that don't exist in this file to keep them all in sync
for colname in set(columnstore.keys()) - set(columns):
columnstore[colname][0].append('')
flush_to_merged_csv(join(location,mergefile),delimiter)
combine( './', ['in1.csv','in2.csv','in3.csv','in4.csv','in5.csv','in6.csv','in7.csv','in8.csv','in9.csv','in10.csv'],'output.csv',',')
使用pandas library and the concat
函数
import pandas
import glob
df = pandas.concat([pandas.read_csv(x) for x in glob.glob("in*.csv")])
df.to_csv("output.csv")
这是一个使用标准库模块的简单解决方案。这是 Python 3. 使用备用注释 with
行 Python 2:
import csv
import glob
rows = []
fields = set()
for filename in glob.glob('in*.csv'):
#with open(filename,'rb') as f:
with open(filename,newline='') as f:
r = csv.DictReader(f)
rows.extend(row for row in r)
fields.update(r.fieldnames)
#with open('result.csv','wb') as f:
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
编辑
每个评论,添加文件名和行号:
import csv
import glob
rows = []
fields = set(['filename','lineno'])
for filename in glob.glob('in*.csv'):
with open(filename,newline='') as f:
r = csv.DictReader(f)
for lineno,row in enumerate(r,1):
row.update({'filename':filename,'lineno':lineno})
rows.append(row)
fields.update(r.fieldnames)
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
原来在我的系统上用了 8.8 秒。本次更新耗时10.6秒
另请注意,如果您在传递给 DictWriter
之前订购 fields
,您可以按您想要的顺序放置列。
对 csv.DictReader()
and csv.DictWriter()
objects 使用 two-pass 方法。第一个收集所有文件中使用的 headers 集,然后第二个基于 headers.
跨数据复制
收集 headers 就像访问 reader objects 上的 fieldnames
属性一样简单:
import csv
import glob
files = []
readers = []
fields = set()
try:
for filename in glob.glob('in*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
files.append(fileobj) # for later closing
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
readers.append(reader)
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for reader in readers:
# copy across rows; missing fields will be left blank
for row in reader:
writer.writerow(row)
finally:
# close out open file objects
for fileobj in files:
fileobj.close()
每个 reader 生成一个包含所有字段子集的字典,但是 DictWriter
将使用 restval
参数的值(当省略时默认为 ''
我在这里做了)填写每个缺失键的值。
这里我假设是Python2;如果这是 Python 3,您可以使用 ExitStack()
来管理 reader 的打开文件;从文件模式中省略 b
并向所有打开的调用添加一个 newline=''
参数以将换行符处理留给 CSV 模块。
以上代码只使用了一个缓冲区来读写行;行大多一次从一个打开的 reader 移动到编写器。
不幸的是,我们不能将 writer.writerows(reader)
用作 Python 错误跟踪器中的 DictWriter.writerows()
implementation first converts everything in reader
to a list of lists before passing it on to the underlying csv.writer.writerows()
method, see issue 23495。
@MartijnPieter 的回答非常有帮助,但由于在阅读内容时在阅读 headers 到 re-use 后保持文件打开,它在 ~255 个文件处崩溃(我发现)。我需要合并 ~32,000 个文件,所以稍微重写了他的代码以免崩溃。我还选择将它拆分为两个函数,这样我就可以在两者之间分析列 headers。
def collectColumnNamesInCSVs():
fields = set()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
fileobj.close()
return fields
def combineCSVs(fields):
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
for row in reader:
writer.writerow(row)
fileobj.close()
outf.close()
当打开种类繁多的 CSV(<1k - 700k;每列 20-60 个混合列;总计约 130 headers)时,第二阶段每 1000 个文件花费约 1 分钟1.4GHz MacBook Air。不错,比 Pandas.
快了几个数量级
给定十个 1MB 的 csv 文件,每个文件的布局略有不同,我需要将它们组合成一个具有相同 header 的规范化单个文件。空字符串适用于空值。
列示例:
1. FIELD1, FIELD2, FIELD3
2. FIELD2, FIELD1, FIELD3
3. FIELD1, FIELD3, FIELD4
4. FIELD3, FIELD4, FIELD5, FIELD6
5. FIELD2
输出看起来像(虽然顺序不重要,但我的代码按发现的顺序排列它们):
FIELD1, FIELD2, FIELD3, FIELD4, FIELD5, FIELD6
所以基本上字段可以按任何顺序出现,字段可能会丢失,或者以前没有见过的新字段。所有这些都必须包含在输出文件中。无需连接,最终部分中的数据行数必须等于输出中的行数。
将10MB全部读入内存即可。不知何故使用 100MB 来做它不会。如果需要,您也可以一次打开所有文件。很多文件手,可用内存,但它会 运行ning 针对 NAS,因此它需要高效(没有太多 NAS 操作)。
我现在的方法是将每个文件读入列列表,在发现新列时构建新的列列表,然后将其全部写到一个文件中。不过,我希望有人有更聪明的东西,因为我在这个过程中遇到了瓶颈,所以任何缓解都是有帮助的。
如果有人想尝试,我有示例文件 here。我将 post 我当前的代码作为可能的答案。寻找最快的时间,当我 运行 在我的服务器上(很多内核,很多内存)使用本地磁盘时。
它不是超短之类的,但基本上我是将它们读入列存储然后将它们全部写出。我希望速度更快,速度相同,相同 i/o 并且内存更少也很好......但速度更快是最重要的。
import csv
from os.path import join
from collections import OrderedDict
# Accumulators
#columnstore = OrderedDict of tuples ( Data List, Starting rowcount)
columnstore = OrderedDict()
total_rowcount = 0
def flush_to_merged_csv(merged_filename,delimiter):
with open(merged_filename,'w') as f:
writer = csv.writer(f, delimiter=bytes(delimiter) )
# Write the header first for all columns
writer.writerow(columnstore.keys())
# Write each row
for rowidx in range(0,total_rowcount):
# Assemble row from columnstore
row = []
for col in columnstore.keys():
if columnstore[col][1] <= rowidx:
row.append(columnstore[col][0][rowidx - columnstore[col][1]])
else:
row.append('')
writer.writerow(row)
def combine(location, files, mergefile, delimiter):
global total_rowcount
for filename in files:
with open(join(location,filename),'rb') as f:
file_rowcount = 0
reader = csv.reader( f, delimiter=bytes(delimiter) )
# Get the column names.
# Normalize the names (all upper, strip)
columns = [ x.strip().upper() for x in reader.next() ]
# Columnstore maintenance. Add new columns to columnstore
for col in columns:
if not columnstore.has_key(col):
columnstore[col] = ( [], total_rowcount )
# Loop throught the remaining file, adding each cell to the proper columnstore
for row in reader:
field_count = len(row)
total_rowcount += 1
# Add the columns that exist to the columnstore.
for columnidx in range(0,len(columns)):
# Handle missing trailing fields as empty
if columnidx >= field_count:
columnstore[columns[columnidx]][0].append('')
else:
columnstore[columns[columnidx]][0].append(row[columnidx])
# Add emptry strings to any columnstores that don't exist in this file to keep them all in sync
for colname in set(columnstore.keys()) - set(columns):
columnstore[colname][0].append('')
flush_to_merged_csv(join(location,mergefile),delimiter)
combine( './', ['in1.csv','in2.csv','in3.csv','in4.csv','in5.csv','in6.csv','in7.csv','in8.csv','in9.csv','in10.csv'],'output.csv',',')
使用pandas library and the concat
函数
import pandas
import glob
df = pandas.concat([pandas.read_csv(x) for x in glob.glob("in*.csv")])
df.to_csv("output.csv")
这是一个使用标准库模块的简单解决方案。这是 Python 3. 使用备用注释 with
行 Python 2:
import csv
import glob
rows = []
fields = set()
for filename in glob.glob('in*.csv'):
#with open(filename,'rb') as f:
with open(filename,newline='') as f:
r = csv.DictReader(f)
rows.extend(row for row in r)
fields.update(r.fieldnames)
#with open('result.csv','wb') as f:
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
编辑
每个评论,添加文件名和行号:
import csv
import glob
rows = []
fields = set(['filename','lineno'])
for filename in glob.glob('in*.csv'):
with open(filename,newline='') as f:
r = csv.DictReader(f)
for lineno,row in enumerate(r,1):
row.update({'filename':filename,'lineno':lineno})
rows.append(row)
fields.update(r.fieldnames)
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
原来在我的系统上用了 8.8 秒。本次更新耗时10.6秒
另请注意,如果您在传递给 DictWriter
之前订购 fields
,您可以按您想要的顺序放置列。
对 csv.DictReader()
and csv.DictWriter()
objects 使用 two-pass 方法。第一个收集所有文件中使用的 headers 集,然后第二个基于 headers.
收集 headers 就像访问 reader objects 上的 fieldnames
属性一样简单:
import csv
import glob
files = []
readers = []
fields = set()
try:
for filename in glob.glob('in*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
files.append(fileobj) # for later closing
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
readers.append(reader)
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for reader in readers:
# copy across rows; missing fields will be left blank
for row in reader:
writer.writerow(row)
finally:
# close out open file objects
for fileobj in files:
fileobj.close()
每个 reader 生成一个包含所有字段子集的字典,但是 DictWriter
将使用 restval
参数的值(当省略时默认为 ''
我在这里做了)填写每个缺失键的值。
这里我假设是Python2;如果这是 Python 3,您可以使用 ExitStack()
来管理 reader 的打开文件;从文件模式中省略 b
并向所有打开的调用添加一个 newline=''
参数以将换行符处理留给 CSV 模块。
以上代码只使用了一个缓冲区来读写行;行大多一次从一个打开的 reader 移动到编写器。
不幸的是,我们不能将 writer.writerows(reader)
用作 Python 错误跟踪器中的 DictWriter.writerows()
implementation first converts everything in reader
to a list of lists before passing it on to the underlying csv.writer.writerows()
method, see issue 23495。
@MartijnPieter 的回答非常有帮助,但由于在阅读内容时在阅读 headers 到 re-use 后保持文件打开,它在 ~255 个文件处崩溃(我发现)。我需要合并 ~32,000 个文件,所以稍微重写了他的代码以免崩溃。我还选择将它拆分为两个函数,这样我就可以在两者之间分析列 headers。
def collectColumnNamesInCSVs():
fields = set()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
fileobj.close()
return fields
def combineCSVs(fields):
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
for row in reader:
writer.writerow(row)
fileobj.close()
outf.close()
当打开种类繁多的 CSV(<1k - 700k;每列 20-60 个混合列;总计约 130 headers)时,第二阶段每 1000 个文件花费约 1 分钟1.4GHz MacBook Air。不错,比 Pandas.
快了几个数量级