如何在大型 CSV 中对列进行分组并对它们求和?

How to group columns and sum them, in a large CSV?

我有一个很大的 CSV(数亿行),我需要根据 IDLocation 和 [=] 的分组对 Value 列求和15=] 列。

我的 CSV 类似于:

    ID Location        Date  Value
 1   1     Loc1  2022-01-27      5
 2   1     Loc1  2022-01-27      4
 3   1     Loc1  2022-01-28      7
 4   1     Loc2  2022-01-29      8
 5   2     Loc1  2022-01-27     11
 6   2     Loc2  2022-01-28      4
 7   2     Loc2  2022-01-29      6
 8   3     Loc1  2022-01-28      9
 9   3     Loc1  2022-01-28      9
10   3     Loc2  2022-01-29      1

示例输入应如下所示,processed/summed,并写入新的 CSV:

ID Location        Date  Value
1     Loc1  2022-01-27      9
1     Loc1  2022-01-28      7
1     Loc2  2022-01-29      8
2     Loc1  2022-01-27     11
2     Loc2  2022-01-28      4
2     Loc2  2022-01-29      6
3     Loc1  2022-01-28     18
3     Loc2  2022-01-29      1

我知道使用 df.groupby([columns]).sum() 会得到想要的结果,但是 CSV 文件太大了,我总是遇到内存错误。我已经尝试过查看 read/manipulate CSV 数据的其他方法,但仍然没有成功,所以如果有人知道我可以在 python 中执行此操作而不会最大化我的内存,那就太好了!

注意:我知道在我的初始 csv 中有一个未命名的第一列,这是无关紧要的,不需要在输出中,但它是否无关紧要:)

你试过了吗:

output = []
for key, group in df.groupby([columns]):
    output.append((key, group['a'].sum()))

pd.DataFrame(output).to_csv("....csv")

来源:

适当的答案可能是使用 Dask,但您可以使用 Pandas 和块。 last_row 变量是前一个块的最后一行是当前块的第一行具有相同 IDLocationDate 的情况。

chunksize = 4  # Number of rows
last_row = pd.DataFrame()  # Last row of the previous chunk

with open('data.csv') as reader, open('output.csv', 'w') as writer:

    # Write headers
    writer.write(reader.readline())
    reader.seek(0)

    for chunk in pd.read_csv(reader, chunksize=chunksize):
        df = pd.concat([last_row, chunk])
        df = df.groupby(['ID', 'Location', 'Date'], as_index=False)['Value'].sum()
        df, last_row = df.iloc[:-1], df.iloc[-1:]
        df.to_csv(writer, header=False, index=False)

    # Don't forget the last row!
    last_row.to_csv(writer, header=False, index=False)

output.csv的内容:

ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1

如果要连接的行是连续的,旧的 csv 模块允许一次处理一行大文件,因此占用的内存最少。

在这里你可以使用:

with open('input.csv') as fd, open('output.csv', 'w', newline='') as fdout:
    rd, wr = csv.reader(fd), csv.writer(fdout)
    _ = wr.writerow(next(rd))      # header line
    old = [None]*4
    for row in rd:
        row[3] = int(row[3])       # convert value field to integer
        if row[:3] == old[:3]:
            old[3] += row[3]       # concatenate values of similar rows     
        else:
            if old[0]:             # and write the concatenated row
                _ = wr.writerow(old)
            old = row
    if old[0]:                     # do not forget the last row...
        _ = wr.writerow(old)

根据显示的输入数据,它给出了预期的结果:

ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1

不如 Pandas 代码干净整洁,但它应该可以毫无问题地处理大于可用内存的文件。

您可以使用内置的 csv 库并逐行构建输出。 Counter 可用于合并和计算具有相同条目的行:

from collections import Counter
import csv

data = Counter()

with open('input.csv') as f_input:
    csv_input = csv.reader(f_input)
    header = next(csv_input)
    
    for row in csv_input:
        data[tuple(row[:3])] += int(row[3])

with open('output.csv', 'w', newline='') as f_output:
    csv_output = csv.writer(f_output)
    csv_output.writerow(header)

    for key, value in data.items():
        csv_output.writerow([*key, value])

给出输出:

ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1

这避免了将输入 CSV 存储在内存中,只存储输出 CSV 数据。


如果这也太大,只要 ID 列发生变化,就会输出 data。这虽然假设输入是 ID 顺序:

from collections import Counter
import csv

def write_id(csv_output, data):
    for key, value in data.items():
        csv_output.writerow([*key, value])
    data.clear()


data = Counter()
current_id = None

with open('input.csv') as f_input, open('output.csv', 'w', newline='') as f_output:
    csv_input = csv.reader(f_input)
    csv_output = csv.writer(f_output)
    
    header = next(csv_input)
    csv_output.writerow(header)
    
    for row in csv_input:
        if current_id and row[0] != current_id:
            write_id(csv_output, data)
            
        data[tuple(row[:3])] += int(row[3])
        current_id = row[0]
        
    write_id(csv_output, data)        

对于给定的示例,这将给出相同的输出。

已经有许多答案可能就足够了:@MartinEvans 和@Corralien 都推荐 breaking-up/chunking input-output。 我特别好奇@MartinEvans 的回答是否在您的记忆力限制范围内有效:这是迄今为止最简单且 still-correct 的解决方案(如我所见)。

如果其中任何一个都不起作用,我认为您将面临以下问题:

What makes a chunk with all the ID/Loc/Date groups I need to count contained in that chunk, so no group crosses over a chunk and gets counted multiple times (end up with smaller sub sums, instead of a single and true sum)?

在对 OP 的评论中,您说输入是按“周数”排序的。我认为这是决定一组 ID/Loc/Date 的所有计数的唯一决定因素。当读者跨越 week-group 边界时,它会知道停止计算到目前为止遇到的任何组是“安全的”,并将这些计数刷新到磁盘(以避免在内存中保留太多计数)。

此解决方案依赖于输入 CSV 的 pre-sorted-ness。不过,如果您的输入有点乱:您可以 运行 这个,测试重复组,re-sort,和 re-run 这个(我认为这个问题很大,memory-constrained 减速器):

import csv
from collections import Counter
from datetime import datetime


# Get the data out...
out_csv = open('output.csv', 'w', newline='')
writer = csv.writer(out_csv)

def write_row(row):
    global writer
    writer.writerow(row)


# Don't let counter get too big (for memory)
def flush_counter(counter):
    for key, sum_ in counter.items():
        id_, loc, date = key
        write_row([id_, loc, date, sum_])


# You said "already grouped by week-number", so:
# -   read and sum your input CSV in chunks of "week (number) groups"
# -   once the reader reads past a week-group, it concludes week-group is finished
#     and flushes the counts for that week-group

last_wk_group = None
counter = Counter()

# Open input
with open('input.csv', newline='') as f:
    reader = csv.reader(f)

    # Copy header
    header = next(reader)
    write_row(header)

    for row in reader:
        # Get "base" values
        id_, loc, date = row[0:3]
        value = int(row[3])

        # 2022-01-27  ->  2022-04
        wk_group = datetime.strptime(date, r'%Y-%m-%d').strftime(r'%Y-%U')

        # Decide if last week-group has passed
        if wk_group != last_wk_group:
            flush_counter(counter)
            counter = Counter()
            last_wk_group = wk_group

        # Count/sum this week-groups
        key = tuple([id_, loc, date_])
        counter[key] += value


# Flush remaining week-group counts
flush_counter(counter)

作为一项基本测试,我将样本输入的第一行移到了最后一行,就像@Corralien 所问的那样:

ID,Location,Date,Value
1,Loc1,2022-01-27,5
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,9
3,Loc1,2022-01-28,9
3,Loc2,2022-01-29,1
1,Loc1,2022-01-27,4

而且我仍然得到正确的输出(即使顺序正确,因为 1,Loc1,2022-01-27 在输入中最先出现):

ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1