如何在大型 CSV 中对列进行分组并对它们求和?
How to group columns and sum them, in a large CSV?
我有一个很大的 CSV(数亿行),我需要根据 ID
、Location
和 [=] 的分组对 Value
列求和15=] 列。
我的 CSV 类似于:
ID Location Date Value
1 1 Loc1 2022-01-27 5
2 1 Loc1 2022-01-27 4
3 1 Loc1 2022-01-28 7
4 1 Loc2 2022-01-29 8
5 2 Loc1 2022-01-27 11
6 2 Loc2 2022-01-28 4
7 2 Loc2 2022-01-29 6
8 3 Loc1 2022-01-28 9
9 3 Loc1 2022-01-28 9
10 3 Loc2 2022-01-29 1
{ID: 1, Location: Loc1, Date: 2022-01-27}
就是这样一个组,它的子值 5
和 4
应该加起来为 9
{ID: 3, Location: Loc1, Date: 2022-01-28}
是另一组,其总和应该是18
示例输入应如下所示,processed/summed,并写入新的 CSV:
ID Location Date Value
1 Loc1 2022-01-27 9
1 Loc1 2022-01-28 7
1 Loc2 2022-01-29 8
2 Loc1 2022-01-27 11
2 Loc2 2022-01-28 4
2 Loc2 2022-01-29 6
3 Loc1 2022-01-28 18
3 Loc2 2022-01-29 1
我知道使用 df.groupby([columns]).sum()
会得到想要的结果,但是 CSV 文件太大了,我总是遇到内存错误。我已经尝试过查看 read/manipulate CSV 数据的其他方法,但仍然没有成功,所以如果有人知道我可以在 python 中执行此操作而不会最大化我的内存,那就太好了!
注意:我知道在我的初始 csv 中有一个未命名的第一列,这是无关紧要的,不需要在输出中,但它是否无关紧要:)
你试过了吗:
output = []
for key, group in df.groupby([columns]):
output.append((key, group['a'].sum()))
pd.DataFrame(output).to_csv("....csv")
来源:
适当的答案可能是使用 Dask
,但您可以使用 Pandas
和块。 last_row
变量是前一个块的最后一行是当前块的第一行具有相同 ID
、Location
和 Date
的情况。
chunksize = 4 # Number of rows
last_row = pd.DataFrame() # Last row of the previous chunk
with open('data.csv') as reader, open('output.csv', 'w') as writer:
# Write headers
writer.write(reader.readline())
reader.seek(0)
for chunk in pd.read_csv(reader, chunksize=chunksize):
df = pd.concat([last_row, chunk])
df = df.groupby(['ID', 'Location', 'Date'], as_index=False)['Value'].sum()
df, last_row = df.iloc[:-1], df.iloc[-1:]
df.to_csv(writer, header=False, index=False)
# Don't forget the last row!
last_row.to_csv(writer, header=False, index=False)
output.csv
的内容:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
如果要连接的行是连续的,旧的 csv 模块允许一次处理一行大文件,因此占用的内存最少。
在这里你可以使用:
with open('input.csv') as fd, open('output.csv', 'w', newline='') as fdout:
rd, wr = csv.reader(fd), csv.writer(fdout)
_ = wr.writerow(next(rd)) # header line
old = [None]*4
for row in rd:
row[3] = int(row[3]) # convert value field to integer
if row[:3] == old[:3]:
old[3] += row[3] # concatenate values of similar rows
else:
if old[0]: # and write the concatenated row
_ = wr.writerow(old)
old = row
if old[0]: # do not forget the last row...
_ = wr.writerow(old)
根据显示的输入数据,它给出了预期的结果:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
不如 Pandas 代码干净整洁,但它应该可以毫无问题地处理大于可用内存的文件。
您可以使用内置的 csv 库并逐行构建输出。 Counter
可用于合并和计算具有相同条目的行:
from collections import Counter
import csv
data = Counter()
with open('input.csv') as f_input:
csv_input = csv.reader(f_input)
header = next(csv_input)
for row in csv_input:
data[tuple(row[:3])] += int(row[3])
with open('output.csv', 'w', newline='') as f_output:
csv_output = csv.writer(f_output)
csv_output.writerow(header)
for key, value in data.items():
csv_output.writerow([*key, value])
给出输出:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
这避免了将输入 CSV 存储在内存中,只存储输出 CSV 数据。
如果这也太大,只要 ID
列发生变化,就会输出 data
。这虽然假设输入是 ID
顺序:
from collections import Counter
import csv
def write_id(csv_output, data):
for key, value in data.items():
csv_output.writerow([*key, value])
data.clear()
data = Counter()
current_id = None
with open('input.csv') as f_input, open('output.csv', 'w', newline='') as f_output:
csv_input = csv.reader(f_input)
csv_output = csv.writer(f_output)
header = next(csv_input)
csv_output.writerow(header)
for row in csv_input:
if current_id and row[0] != current_id:
write_id(csv_output, data)
data[tuple(row[:3])] += int(row[3])
current_id = row[0]
write_id(csv_output, data)
对于给定的示例,这将给出相同的输出。
已经有许多答案可能就足够了:@MartinEvans 和@Corralien 都推荐 breaking-up/chunking input-output。
我特别好奇@MartinEvans 的回答是否在您的记忆力限制范围内有效:这是迄今为止最简单且 still-correct 的解决方案(如我所见)。
如果其中任何一个都不起作用,我认为您将面临以下问题:
What makes a chunk with all the ID/Loc/Date
groups I need to count contained in that chunk, so no group crosses over a chunk and gets counted multiple times (end up with smaller sub sums, instead of a single and true sum)?
在对 OP 的评论中,您说输入是按“周数”排序的。我认为这是决定一组 ID/Loc/Date
的所有计数的唯一决定因素。当读者跨越 week-group 边界时,它会知道停止计算到目前为止遇到的任何组是“安全的”,并将这些计数刷新到磁盘(以避免在内存中保留太多计数)。
此解决方案依赖于输入 CSV 的 pre-sorted-ness。不过,如果您的输入有点乱:您可以 运行 这个,测试重复组,re-sort,和 re-run 这个(我认为这个问题很大,memory-constrained 减速器):
import csv
from collections import Counter
from datetime import datetime
# Get the data out...
out_csv = open('output.csv', 'w', newline='')
writer = csv.writer(out_csv)
def write_row(row):
global writer
writer.writerow(row)
# Don't let counter get too big (for memory)
def flush_counter(counter):
for key, sum_ in counter.items():
id_, loc, date = key
write_row([id_, loc, date, sum_])
# You said "already grouped by week-number", so:
# - read and sum your input CSV in chunks of "week (number) groups"
# - once the reader reads past a week-group, it concludes week-group is finished
# and flushes the counts for that week-group
last_wk_group = None
counter = Counter()
# Open input
with open('input.csv', newline='') as f:
reader = csv.reader(f)
# Copy header
header = next(reader)
write_row(header)
for row in reader:
# Get "base" values
id_, loc, date = row[0:3]
value = int(row[3])
# 2022-01-27 -> 2022-04
wk_group = datetime.strptime(date, r'%Y-%m-%d').strftime(r'%Y-%U')
# Decide if last week-group has passed
if wk_group != last_wk_group:
flush_counter(counter)
counter = Counter()
last_wk_group = wk_group
# Count/sum this week-groups
key = tuple([id_, loc, date_])
counter[key] += value
# Flush remaining week-group counts
flush_counter(counter)
作为一项基本测试,我将样本输入的第一行移到了最后一行,就像@Corralien 所问的那样:
ID,Location,Date,Value
1,Loc1,2022-01-27,5
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,9
3,Loc1,2022-01-28,9
3,Loc2,2022-01-29,1
1,Loc1,2022-01-27,4
而且我仍然得到正确的输出(即使顺序正确,因为 1,Loc1,2022-01-27
在输入中最先出现):
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
我有一个很大的 CSV(数亿行),我需要根据 ID
、Location
和 [=] 的分组对 Value
列求和15=] 列。
我的 CSV 类似于:
ID Location Date Value
1 1 Loc1 2022-01-27 5
2 1 Loc1 2022-01-27 4
3 1 Loc1 2022-01-28 7
4 1 Loc2 2022-01-29 8
5 2 Loc1 2022-01-27 11
6 2 Loc2 2022-01-28 4
7 2 Loc2 2022-01-29 6
8 3 Loc1 2022-01-28 9
9 3 Loc1 2022-01-28 9
10 3 Loc2 2022-01-29 1
{ID: 1, Location: Loc1, Date: 2022-01-27}
就是这样一个组,它的子值5
和4
应该加起来为9
{ID: 3, Location: Loc1, Date: 2022-01-28}
是另一组,其总和应该是18
示例输入应如下所示,processed/summed,并写入新的 CSV:
ID Location Date Value
1 Loc1 2022-01-27 9
1 Loc1 2022-01-28 7
1 Loc2 2022-01-29 8
2 Loc1 2022-01-27 11
2 Loc2 2022-01-28 4
2 Loc2 2022-01-29 6
3 Loc1 2022-01-28 18
3 Loc2 2022-01-29 1
我知道使用 df.groupby([columns]).sum()
会得到想要的结果,但是 CSV 文件太大了,我总是遇到内存错误。我已经尝试过查看 read/manipulate CSV 数据的其他方法,但仍然没有成功,所以如果有人知道我可以在 python 中执行此操作而不会最大化我的内存,那就太好了!
注意:我知道在我的初始 csv 中有一个未命名的第一列,这是无关紧要的,不需要在输出中,但它是否无关紧要:)
你试过了吗:
output = []
for key, group in df.groupby([columns]):
output.append((key, group['a'].sum()))
pd.DataFrame(output).to_csv("....csv")
来源:
适当的答案可能是使用 Dask
,但您可以使用 Pandas
和块。 last_row
变量是前一个块的最后一行是当前块的第一行具有相同 ID
、Location
和 Date
的情况。
chunksize = 4 # Number of rows
last_row = pd.DataFrame() # Last row of the previous chunk
with open('data.csv') as reader, open('output.csv', 'w') as writer:
# Write headers
writer.write(reader.readline())
reader.seek(0)
for chunk in pd.read_csv(reader, chunksize=chunksize):
df = pd.concat([last_row, chunk])
df = df.groupby(['ID', 'Location', 'Date'], as_index=False)['Value'].sum()
df, last_row = df.iloc[:-1], df.iloc[-1:]
df.to_csv(writer, header=False, index=False)
# Don't forget the last row!
last_row.to_csv(writer, header=False, index=False)
output.csv
的内容:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
如果要连接的行是连续的,旧的 csv 模块允许一次处理一行大文件,因此占用的内存最少。
在这里你可以使用:
with open('input.csv') as fd, open('output.csv', 'w', newline='') as fdout:
rd, wr = csv.reader(fd), csv.writer(fdout)
_ = wr.writerow(next(rd)) # header line
old = [None]*4
for row in rd:
row[3] = int(row[3]) # convert value field to integer
if row[:3] == old[:3]:
old[3] += row[3] # concatenate values of similar rows
else:
if old[0]: # and write the concatenated row
_ = wr.writerow(old)
old = row
if old[0]: # do not forget the last row...
_ = wr.writerow(old)
根据显示的输入数据,它给出了预期的结果:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
不如 Pandas 代码干净整洁,但它应该可以毫无问题地处理大于可用内存的文件。
您可以使用内置的 csv 库并逐行构建输出。 Counter
可用于合并和计算具有相同条目的行:
from collections import Counter
import csv
data = Counter()
with open('input.csv') as f_input:
csv_input = csv.reader(f_input)
header = next(csv_input)
for row in csv_input:
data[tuple(row[:3])] += int(row[3])
with open('output.csv', 'w', newline='') as f_output:
csv_output = csv.writer(f_output)
csv_output.writerow(header)
for key, value in data.items():
csv_output.writerow([*key, value])
给出输出:
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1
这避免了将输入 CSV 存储在内存中,只存储输出 CSV 数据。
如果这也太大,只要 ID
列发生变化,就会输出 data
。这虽然假设输入是 ID
顺序:
from collections import Counter
import csv
def write_id(csv_output, data):
for key, value in data.items():
csv_output.writerow([*key, value])
data.clear()
data = Counter()
current_id = None
with open('input.csv') as f_input, open('output.csv', 'w', newline='') as f_output:
csv_input = csv.reader(f_input)
csv_output = csv.writer(f_output)
header = next(csv_input)
csv_output.writerow(header)
for row in csv_input:
if current_id and row[0] != current_id:
write_id(csv_output, data)
data[tuple(row[:3])] += int(row[3])
current_id = row[0]
write_id(csv_output, data)
对于给定的示例,这将给出相同的输出。
已经有许多答案可能就足够了:@MartinEvans 和@Corralien 都推荐 breaking-up/chunking input-output。 我特别好奇@MartinEvans 的回答是否在您的记忆力限制范围内有效:这是迄今为止最简单且 still-correct 的解决方案(如我所见)。
如果其中任何一个都不起作用,我认为您将面临以下问题:
What makes a chunk with all the
ID/Loc/Date
groups I need to count contained in that chunk, so no group crosses over a chunk and gets counted multiple times (end up with smaller sub sums, instead of a single and true sum)?
在对 OP 的评论中,您说输入是按“周数”排序的。我认为这是决定一组 ID/Loc/Date
的所有计数的唯一决定因素。当读者跨越 week-group 边界时,它会知道停止计算到目前为止遇到的任何组是“安全的”,并将这些计数刷新到磁盘(以避免在内存中保留太多计数)。
此解决方案依赖于输入 CSV 的 pre-sorted-ness。不过,如果您的输入有点乱:您可以 运行 这个,测试重复组,re-sort,和 re-run 这个(我认为这个问题很大,memory-constrained 减速器):
import csv
from collections import Counter
from datetime import datetime
# Get the data out...
out_csv = open('output.csv', 'w', newline='')
writer = csv.writer(out_csv)
def write_row(row):
global writer
writer.writerow(row)
# Don't let counter get too big (for memory)
def flush_counter(counter):
for key, sum_ in counter.items():
id_, loc, date = key
write_row([id_, loc, date, sum_])
# You said "already grouped by week-number", so:
# - read and sum your input CSV in chunks of "week (number) groups"
# - once the reader reads past a week-group, it concludes week-group is finished
# and flushes the counts for that week-group
last_wk_group = None
counter = Counter()
# Open input
with open('input.csv', newline='') as f:
reader = csv.reader(f)
# Copy header
header = next(reader)
write_row(header)
for row in reader:
# Get "base" values
id_, loc, date = row[0:3]
value = int(row[3])
# 2022-01-27 -> 2022-04
wk_group = datetime.strptime(date, r'%Y-%m-%d').strftime(r'%Y-%U')
# Decide if last week-group has passed
if wk_group != last_wk_group:
flush_counter(counter)
counter = Counter()
last_wk_group = wk_group
# Count/sum this week-groups
key = tuple([id_, loc, date_])
counter[key] += value
# Flush remaining week-group counts
flush_counter(counter)
作为一项基本测试,我将样本输入的第一行移到了最后一行,就像@Corralien 所问的那样:
ID,Location,Date,Value
1,Loc1,2022-01-27,5
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,9
3,Loc1,2022-01-28,9
3,Loc2,2022-01-29,1
1,Loc1,2022-01-27,4
而且我仍然得到正确的输出(即使顺序正确,因为 1,Loc1,2022-01-27
在输入中最先出现):
ID,Location,Date,Value
1,Loc1,2022-01-27,9
1,Loc1,2022-01-28,7
1,Loc2,2022-01-29,8
2,Loc1,2022-01-27,11
2,Loc2,2022-01-28,4
2,Loc2,2022-01-29,6
3,Loc1,2022-01-28,18
3,Loc2,2022-01-29,1