pandas 和 csv 的计算时间过长

Exaggerated calculation times with pandas and csv

我有一个 3 列 CSV 文件,我在其中使用 python 和 pandas.

执行简单计算

文件很大,不到4Gb,计算后约1.9Gb

CSV 文件是:

数据1、数据2、数据3

aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw97,856521536521321,112535 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw98,6521321,112138 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw98,856521536521321,122135 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw99,521321,112132 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw99,856521536521321,212135

计算是微不足道的。如果 A 列相同,则添加 B 并重写 CSV。 示例结果:

数据1、数据2、数据3

aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw97,856521536521321 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw98,856521543042642 aftqgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24sq90ecnvqqjwvw99,856521537042642

import pandas as pd
#Read csv
df = pd.read_csv('data.csv', sep=',' , engine='python')

# Groupby and sum
df_new = df.groupby(["data1"]).agg({"data2": "sum"}).reset_index()

# Save in new file 
df_new.to_csv('data2.csv', encoding='utf-8', index=False)

如何改进代码以加快执行速度?

目前 vps 需要大约 7 个小时才能完成计算


添加信息

RAM 资源几乎总是 100% (8Gb),而选择引擎 = 'python' 是因为我使用了 https://whosebug.com/ 上已经存在的代码,老实说我没有知道该命令是否有用,但我已经看到计算正确。

Data3 实际上对我没用(现在,将来可能会有用)。

  1. 删除engine='python',没有用。
  2. 获得更多 RAM,8GB 不够,你永远不应该达到 100%(这会减慢你的速度)
  3. (现在为时已晚),但不要对大型数据集使用 .csv 文件。查看羽毛或镶木地板。

如果你不能获得更多内存,那么@Afaq 可能会详细说明文件拆分方法。我在那里看到的问题是,你没有减少你的数据集,所以 map reduce 可能会在 reduce 部分阻塞,除非你以这种方式拆分你的文件,相同的 data1 字符串总是会进入同一个文件。

还有一个替代选项 - 为此使用 convtools。它是一个纯 python 库,它生成纯 python 代码来构建临时转换器。当然,就速度而言,bare python 无法击败 pandas,但至少它不需要任何包装器,而且它的工作方式就像您手动实现所有内容一样。

因此,通常以下内容适合您:

from convtools import conversion as c
from convtools.contrib.tables import Table


# you can store the converter somewhere for further reuse
converter = (
    c.group_by(c.item("data1"))
    .aggregate({
        "data1": c.item("data1"),
        "data2": c.ReduceFuncs.Sum(c.item("data2"))
    })
    .gen_converter()
)

# this is an iterable (stream of rows), not the list
rows = Table.from_csv("tmp4.csv", header=True).into_iter_rows(dict)

Table.from_rows(converter(rows)).into_csv("out.csv")

JFYI:如果您手动 运行 脚本,那么您可以使用例如监控速度。 tqdm,只需包装一个您正在使用的可迭代对象:

from tqdm import tqdm

# same code as above, except for the last line:
Table.from_rows(converter(tqdm(rows))).into_csv("out.csv")

但是: 上面的解决方案不需要将输入文件放入内存,但结果应该。在您的情况下,如果结果是 1.9GB 的 csv 文件,则不太可能将相应的 python objects 装入 8GB RAM。

那么您可能需要:

  • 删除 header: tail -n +2 raw_file.csv > raw_file_no_header.csv
  • pre-sort 文件 sort raw_file_no_header.csv > sorted_file.csv
  • 然后:
from convtools import conversion as c
from convtools.contrib.tables import Table

converter = (
    c.chunk_by(c.item("data1"))
    .aggregate(
        {
            "data1": c.ReduceFuncs.First(c.item("data1")),
            "data2": c.ReduceFuncs.Sum(c.item("data2")),
        }
    )
    .gen_converter()
)
rows = Table.from_csv("sorted_file.csv", header=True).into_iter_rows(dict)
Table.from_rows(converter(rows)).into_csv("out.csv")

这只需要一个组来适应内存。