在 Python 中组合异构 CSV 文件的最快且 I/O 有效的方法

Fastest & I/O efficient way to combine heterogeneous CSV files in Python

给定十个 1MB 的 csv 文件,每个文件的布局略有不同,我需要将它们组合成一个具有相同 header 的规范化单个文件。空字符串适用于空值。

列示例:

1. FIELD1, FIELD2, FIELD3
2. FIELD2, FIELD1, FIELD3
3. FIELD1, FIELD3, FIELD4
4. FIELD3, FIELD4, FIELD5, FIELD6
5. FIELD2

输出看起来像(虽然顺序不重要,但我的代码按发现的顺序排列它们):

FIELD1, FIELD2, FIELD3, FIELD4, FIELD5, FIELD6

所以基本上字段可以按任何顺序出现,字段可能会丢失,或者以前没有见过的新字段。所有这些都必须包含在输出文件中。无需连接,最终部分中的数据行数必须等于输出中的行数。

将10MB全部读入内存即可。不知何故使用 100MB 来做它不会。如果需要,您也可以一次打开所有文件。很多文件手,可用内存,但它会 运行ning 针对 NAS,因此它需要高效(没有太多 NAS 操作)。

我现在的方法是将每个文件读入列列表,在发现新列时构建新的列列表,然后将其全部写到一个文件中。不过,我希望有人有更聪明的东西,因为我在这个过程中遇到了瓶颈,所以任何缓解都是有帮助的。

如果有人想尝试,我有示例文件 here。我将 post 我当前的代码作为可能的答案。寻找最快的时间,当我 运行 在我的服务器上(很多内核,很多内存)使用本地磁盘时。

它不是超短之类的,但基本上我是将它们读入列存储然后将它们全部写出。我希望速度更快,速度相同,相同 i/o 并且内存更少也很好......但速度更快是最重要的。

import csv
from os.path import join
from collections import OrderedDict


# Accumulators
#columnstore = OrderedDict of tuples ( Data List, Starting rowcount)
columnstore = OrderedDict()
total_rowcount = 0

def flush_to_merged_csv(merged_filename,delimiter):

    with open(merged_filename,'w') as f:
        writer = csv.writer(f, delimiter=bytes(delimiter) )

        # Write the header first for all columns
        writer.writerow(columnstore.keys())

        # Write each row
        for rowidx in range(0,total_rowcount):

            # Assemble row from columnstore
            row = []
            for col in columnstore.keys():
                if columnstore[col][1] <= rowidx:
                    row.append(columnstore[col][0][rowidx - columnstore[col][1]])
                else:
                    row.append('')

            writer.writerow(row)


def combine(location, files, mergefile, delimiter):
    global total_rowcount

    for filename in files:

        with open(join(location,filename),'rb') as f:
            file_rowcount = 0
            reader = csv.reader( f, delimiter=bytes(delimiter) )

            # Get the column names.
            # Normalize the names (all upper, strip)
            columns = [ x.strip().upper() for x in reader.next() ]


            # Columnstore maintenance. Add new columns to columnstore
            for col in columns:
                if not columnstore.has_key(col):
                    columnstore[col] = ( [], total_rowcount )


            # Loop throught the remaining file, adding each cell to the proper columnstore
            for row in reader:
                field_count = len(row)
                total_rowcount += 1

                # Add the columns that exist to the columnstore.
                for columnidx in range(0,len(columns)):
                    # Handle missing trailing fields as empty
                    if columnidx >= field_count:
                        columnstore[columns[columnidx]][0].append('')
                    else:
                        columnstore[columns[columnidx]][0].append(row[columnidx])

                # Add emptry strings to any columnstores that don't exist in this file to keep them all in sync
                for colname in set(columnstore.keys()) - set(columns):
                    columnstore[colname][0].append('')

    flush_to_merged_csv(join(location,mergefile),delimiter)

combine( './', ['in1.csv','in2.csv','in3.csv','in4.csv','in5.csv','in6.csv','in7.csv','in8.csv','in9.csv','in10.csv'],'output.csv',',')

使用pandas library and the concat函数

import pandas
import glob
df = pandas.concat([pandas.read_csv(x) for x in glob.glob("in*.csv")])
df.to_csv("output.csv")

这是一个使用标准库模块的简单解决方案。这是 Python 3. 使用备用注释 with 行 Python 2:

import csv
import glob

rows = []
fields = set()

for filename in glob.glob('in*.csv'):
    #with open(filename,'rb') as f:
    with open(filename,newline='') as f:
        r = csv.DictReader(f)
        rows.extend(row for row in r)
        fields.update(r.fieldnames)

#with open('result.csv','wb') as f:
with open('result.csv','w',newline='') as f:
    w = csv.DictWriter(f,fieldnames=fields)
    w.writeheader()
    w.writerows(rows)

编辑

每个评论,添加文件名和行号:

import csv
import glob

rows = []
fields = set(['filename','lineno'])

for filename in glob.glob('in*.csv'):
    with open(filename,newline='') as f:
        r = csv.DictReader(f)
        for lineno,row in enumerate(r,1):
            row.update({'filename':filename,'lineno':lineno})
            rows.append(row)
        fields.update(r.fieldnames)

with open('result.csv','w',newline='') as f:
    w = csv.DictWriter(f,fieldnames=fields)
    w.writeheader()
    w.writerows(rows)

原来在我的系统上用了 8.8 秒。本次更新耗时10.6秒

另请注意,如果您在传递给 DictWriter 之前订购 fields,您可以按您想要的顺序放置列。

csv.DictReader() and csv.DictWriter() objects 使用 two-pass 方法。第一个收集所有文件中使用的 headers 集,然后第二个基于 headers.

跨数据复制

收集 headers 就像访问 reader objects 上的 fieldnames 属性一样简单:

import csv
import glob

files = []
readers = []
fields = set()

try:
    for filename in glob.glob('in*.csv'):
        try:
            fileobj = open(filename, 'rb')
        except IOError:
            print "Failed to open {}".format(filename)
            continue
        files.append(fileobj)  # for later closing

        reader = csv.DictReader(fileobj)
        fields.update(reader.fieldnames)  # reads the first row
        readers.append(reader)

    with open('result.csv', 'wb') as outf:
        writer = csv.DictWriter(outf, fieldnames=sorted(fields))
        writer.writeheader()
        for reader in readers:
            # copy across rows; missing fields will be left blank
            for row in reader:
                writer.writerow(row)
finally:
    # close out open file objects
    for fileobj in files:
        fileobj.close()

每个 reader 生成一个包含所有字段子集的字典,但是 DictWriter 将使用 restval 参数的值(当省略时默认为 ''我在这里做了)填写每个缺失键的值。

这里我假设是Python2;如果这是 Python 3,您可以使用 ExitStack() 来管理 reader 的打开文件;从文件模式中省略 b 并向所有打开的调用添加一个 newline='' 参数以将换行符处理留给 CSV 模块。

以上代码只使用了一个缓冲区来读写行;行大多一次从一个打开的 reader 移动到编写器。

不幸的是,我们不能将 writer.writerows(reader) 用作 Python 错误跟踪器中的 DictWriter.writerows() implementation first converts everything in reader to a list of lists before passing it on to the underlying csv.writer.writerows() method, see issue 23495

@MartijnPieter 的回答非常有帮助,但由于在阅读内容时在阅读 headers 到 re-use 后保持文件打开,它在 ~255 个文件处崩溃(我发现)。我需要合并 ~32,000 个文件,所以稍微重写了他的代码以免崩溃。我还选择将它拆分为两个函数,这样我就可以在两者之间分析列 headers。

def collectColumnNamesInCSVs():
    fields = set()

    for filename in glob.glob('//path//to//files/*.csv'):
        try:
            fileobj = open(filename, 'rb')
        except IOError:
            print "Failed to open {}".format(filename)
            continue

        reader = csv.DictReader(fileobj)
        fields.update(reader.fieldnames)  # reads the first row
        fileobj.close()

    return fields


def combineCSVs(fields):
    with open('result.csv', 'wb') as outf:
        writer = csv.DictWriter(outf, fieldnames=sorted(fields))
        writer.writeheader()

        for filename in glob.glob('//path//to//files/*.csv'):
            try:
                fileobj = open(filename, 'rb')
            except IOError:
                print "Failed to open {}".format(filename)
                continue

            reader = csv.DictReader(fileobj)

            for row in reader:
                writer.writerow(row)

            fileobj.close()

    outf.close()

当打开种类繁多的 CSV(<1k - 700k;每列 20-60 个混合列;总计约 130 headers)时,第二阶段每 1000 个文件花费约 1 分钟1.4GHz MacBook Air。不错,比 Pandas.

快了几个数量级