Large csv files: MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object

Large csv files: MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object

我正在分析一些非常大的文件(约 2 亿行)

csv_filename=pd.read_csv('filename.txt',sep="\t",error_bad_lines=False)

在我收到此错误消息之前,程序运行了大约半小时:

MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object

我想知道是否有办法绕过此内存错误,或者是否有我可以使用的不需要那么多内存的不同功能?我已将文件拆分成多个部分,但问题是我需要一个数据框中的所有数据,以便我可以将其作为一个整体进行分析。

您可以获得内存更大的计算机、simplify/aggregate 数据或尝试其他工具。

如果我这样做,处理这么大的文件,我会切换到 Spark 并使用 PySpark API。

PySpark 将使您能够在超出机器可用内存的情况下使用 pandas 操作。非常适合大数据操作。

http://spark.apache.org/docs/latest/api/python/

In [49]: 7*62388743*8/1e9
Out[49]: 3.493769608

这是您的 3.25 GiB。那只是指向对象的指针。

('field1', dtype('int64')) ('field2', dtype('int64')) ('field3', dtype('O')) ('field4', dtype('O')) ('field5', dtype('O')) ('field6', dtype('O')) ('field7', dtype('O')) ('field8', dtype('O')) ('field9', dtype('O')) ('field10', dtype('int64')) ('field11', dtype('float64')) ('field12', dtype('int64'))

我数了 7 个 dtype('O') 字段;这可以解释形状中的 7;我假设的另一个值是行。 int64 和 float 字段将放在它们自己的数组中。

pandas 中,带有字符串的列是对象数据类型;也就是说,它将字符串存储为常规 Python 字符串,因此需要指针。

根据那个字段列表,1,10,11,12都是数字。但同样,如果内容不“正确”,pandas 可能会将它们更改为对象,使用字符串、None、NA 等

您可以使用 usecols 限制列数。这将减少内存占用。您似乎在 CSV 文件中也有一些错误数据,使您认为应该 int64 的列成为 object。这些可以是空单元格,或任何非数字值。这是一个读取 csv 然后扫描错误数据的示例。此示例使用逗号而不是制表符,因为这样更容易演示。

import pandas as pd
import numpy as np
import io
import re

test_csv = io.StringIO("""field1,field2,field3,other
1,2,3,this
4,what?,6,is
7,,9,extra""")

_numbers_re = re.compile(r"\d+$")

df = pd.read_csv(test_csv,sep=",",error_bad_lines=False, 
    usecols=['field1', 'field2', 'field3'])
print(df)

# columns that arent int64
bad_cols = list(df.dtypes[df.dtypes!=np.dtype('int64')].index)
if bad_cols:
    print("bad cols", bad_cols)
    for bad_col in bad_cols:
        col = df[bad_col]
        bad = col[col.str.match(_numbers_re) != True]
        print(bad)
    exit(1)

您可以逐行读取您的 csv 文件并在使用迭代器的同时计算指标,而不会增加内存使用量。但您不必从头开始(见下文)。

看看 convtools,它是一个轻量级的 python 库,它允许您定义转换,完成后,它会编写和编译 ad-hoc python引擎盖下的代码,所以你有做你想要的事情的功能。

我使用下面的代码生成了一个测试 csv 文件(81MB;200 万行;3 列)

import csv
from tqdm import tqdm
from random import random, choice

field_1_values = [
    "abcde1234567890",
    "cde1234567890",
    "def1234567890",
    "fgh1234567890",
]

with open("input_data-small.csv", "w") as f:
    writer = csv.writer(f)
    writer.writerow(["field_1", "field_2", "field_3"])
    for index in tqdm(range(2000000)):
        writer.writerow([choice(field_1_values), random(), index])

让我们使用 convtools 来处理它,例如:

  • field_1
  • 的唯一值
  • field_2
  • 的第一个值
  • field_2的最大值
  • field_2
  • 的平均价值
  • 行保持 field_2
  • 的最小值
import csv

# pip install convtools
from convtools import conversion as c

# pip install tqdm (for a nice progress bar)
from tqdm import tqdm


# define optional conversions to prepare data
column_to_prepare_conversion = {
    "field_2": c.this().as_type(float),
    "field_3": c.call_func(int, c.this()),
}

with open("input_data-small.csv", "r") as f:
    # csv.DictReader is slow, so let's work with raw rows
    reader = csv.reader(f)

    # fetch column names
    column_names = next(reader)

    # adding some free syntactic sugar (no overhead in compiled code):
    # allows referencing columns by names, while indexes are used under the hood
    def column_(column_name):
        return c.item(column_names.index(column_name))

    if column_to_prepare_conversion:
        prepare_input = c.iter(
            # creates iterable of prepared tuples
            tuple(
                column_(column_name).pipe(
                    column_to_prepare_conversion[column_name]
                )
                if column_name in column_to_prepare_conversion
                else column_(column_name)
                for column_name in column_names
            )
        )
    else:
        prepare_input = c.this()

    # let's aggregate something
    converter = prepare_input.pipe(
        # of course there is group_by too: https://convtools.readthedocs.io/en/latest/cheatsheet.html#group-by-simple
        c.aggregate(
            {
                "unique_field_1": c.ReduceFuncs.ArrayDistinct(
                    column_("field_1")
                ),
                "first_field_2": c.ReduceFuncs.First(column_("field_2")),
                "max_field_2": c.ReduceFuncs.Max(column_("field_2")),
                "avg_field_2": c.ReduceFuncs.Average(column_("field_2")),
                "row_with_min_field_2": c.ReduceFuncs.MinRow(
                    column_("field_2")
                ),
            }

        )
    ).gen_converter(
        # if black is installed, it will print black-formatted code
        debug=True
    )

    results = converter(tqdm(reader))
print(results)

结果:

# 2000000it [00:04, 462371.13it/s]
{
    "unique_field_1": [
        "cde1234567890",
        "fgh1234567890",
        "abcde1234567890",
        "def1234567890",
    ],
    "first_field_2": 4.149475385772927e-05,
    "max_field_2": 0.9999996797416377,
    "avg_field_2": 0.49995239963138766,
    "row_with_min_field_2": ("fgh1234567890", 3.6033313821626223e-07, 425211),
}

所以,如果我的测试文件有 200 行铣削,处理 462K row/s 将花费 ~ 433 秒(7 分 13 秒)。

JFYI:没有任何数据准备和计算的简单文件读取需要大约 203 秒(3 分 23 秒):

with open("input_data-small.csv", "r") as f:
    reader = csv.reader(f)
    for row in tqdm(reader):
        pass
# 2000001it [00:02, 983404.81it/s]