Large csv files: MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object
Large csv files: MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object
我正在分析一些非常大的文件(约 2 亿行)
csv_filename=pd.read_csv('filename.txt',sep="\t",error_bad_lines=False)
在我收到此错误消息之前,程序运行了大约半小时:
MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object
我想知道是否有办法绕过此内存错误,或者是否有我可以使用的不需要那么多内存的不同功能?我已将文件拆分成多个部分,但问题是我需要一个数据框中的所有数据,以便我可以将其作为一个整体进行分析。
您可以获得内存更大的计算机、simplify/aggregate 数据或尝试其他工具。
如果我这样做,处理这么大的文件,我会切换到 Spark 并使用 PySpark API。
PySpark 将使您能够在超出机器可用内存的情况下使用 pandas 操作。非常适合大数据操作。
In [49]: 7*62388743*8/1e9
Out[49]: 3.493769608
这是您的 3.25 GiB。那只是指向对象的指针。
('field1', dtype('int64')) ('field2', dtype('int64')) ('field3', dtype('O')) ('field4', dtype('O')) ('field5', dtype('O')) ('field6', dtype('O')) ('field7', dtype('O')) ('field8', dtype('O')) ('field9', dtype('O')) ('field10', dtype('int64')) ('field11', dtype('float64')) ('field12', dtype('int64'))
我数了 7 个 dtype('O')
字段;这可以解释形状中的 7;我假设的另一个值是行。 int64
和 float 字段将放在它们自己的数组中。
在 pandas
中,带有字符串的列是对象数据类型;也就是说,它将字符串存储为常规 Python 字符串,因此需要指针。
根据那个字段列表,1,10,11,12都是数字。但同样,如果内容不“正确”,pandas
可能会将它们更改为对象,使用字符串、None、NA 等
您可以使用 usecols
限制列数。这将减少内存占用。您似乎在 CSV 文件中也有一些错误数据,使您认为应该 int64
的列成为 object
。这些可以是空单元格,或任何非数字值。这是一个读取 csv 然后扫描错误数据的示例。此示例使用逗号而不是制表符,因为这样更容易演示。
import pandas as pd
import numpy as np
import io
import re
test_csv = io.StringIO("""field1,field2,field3,other
1,2,3,this
4,what?,6,is
7,,9,extra""")
_numbers_re = re.compile(r"\d+$")
df = pd.read_csv(test_csv,sep=",",error_bad_lines=False,
usecols=['field1', 'field2', 'field3'])
print(df)
# columns that arent int64
bad_cols = list(df.dtypes[df.dtypes!=np.dtype('int64')].index)
if bad_cols:
print("bad cols", bad_cols)
for bad_col in bad_cols:
col = df[bad_col]
bad = col[col.str.match(_numbers_re) != True]
print(bad)
exit(1)
您可以逐行读取您的 csv 文件并在使用迭代器的同时计算指标,而不会增加内存使用量。但您不必从头开始(见下文)。
看看 convtools,它是一个轻量级的 python 库,它允许您定义转换,完成后,它会编写和编译 ad-hoc python引擎盖下的代码,所以你有做你想要的事情的功能。
我使用下面的代码生成了一个测试 csv 文件(81MB;200 万行;3 列)
import csv
from tqdm import tqdm
from random import random, choice
field_1_values = [
"abcde1234567890",
"cde1234567890",
"def1234567890",
"fgh1234567890",
]
with open("input_data-small.csv", "w") as f:
writer = csv.writer(f)
writer.writerow(["field_1", "field_2", "field_3"])
for index in tqdm(range(2000000)):
writer.writerow([choice(field_1_values), random(), index])
让我们使用 convtools
来处理它,例如:
- field_1
的唯一值
- field_2
的第一个值
- field_2的最大值
- field_2
的平均价值
- 行保持 field_2
的最小值
import csv
# pip install convtools
from convtools import conversion as c
# pip install tqdm (for a nice progress bar)
from tqdm import tqdm
# define optional conversions to prepare data
column_to_prepare_conversion = {
"field_2": c.this().as_type(float),
"field_3": c.call_func(int, c.this()),
}
with open("input_data-small.csv", "r") as f:
# csv.DictReader is slow, so let's work with raw rows
reader = csv.reader(f)
# fetch column names
column_names = next(reader)
# adding some free syntactic sugar (no overhead in compiled code):
# allows referencing columns by names, while indexes are used under the hood
def column_(column_name):
return c.item(column_names.index(column_name))
if column_to_prepare_conversion:
prepare_input = c.iter(
# creates iterable of prepared tuples
tuple(
column_(column_name).pipe(
column_to_prepare_conversion[column_name]
)
if column_name in column_to_prepare_conversion
else column_(column_name)
for column_name in column_names
)
)
else:
prepare_input = c.this()
# let's aggregate something
converter = prepare_input.pipe(
# of course there is group_by too: https://convtools.readthedocs.io/en/latest/cheatsheet.html#group-by-simple
c.aggregate(
{
"unique_field_1": c.ReduceFuncs.ArrayDistinct(
column_("field_1")
),
"first_field_2": c.ReduceFuncs.First(column_("field_2")),
"max_field_2": c.ReduceFuncs.Max(column_("field_2")),
"avg_field_2": c.ReduceFuncs.Average(column_("field_2")),
"row_with_min_field_2": c.ReduceFuncs.MinRow(
column_("field_2")
),
}
)
).gen_converter(
# if black is installed, it will print black-formatted code
debug=True
)
results = converter(tqdm(reader))
print(results)
结果:
# 2000000it [00:04, 462371.13it/s]
{
"unique_field_1": [
"cde1234567890",
"fgh1234567890",
"abcde1234567890",
"def1234567890",
],
"first_field_2": 4.149475385772927e-05,
"max_field_2": 0.9999996797416377,
"avg_field_2": 0.49995239963138766,
"row_with_min_field_2": ("fgh1234567890", 3.6033313821626223e-07, 425211),
}
所以,如果我的测试文件有 200 行铣削,处理 462K row/s 将花费 ~ 433 秒(7 分 13 秒)。
JFYI:没有任何数据准备和计算的简单文件读取需要大约 203 秒(3 分 23 秒):
with open("input_data-small.csv", "r") as f:
reader = csv.reader(f)
for row in tqdm(reader):
pass
# 2000001it [00:02, 983404.81it/s]
我正在分析一些非常大的文件(约 2 亿行)
csv_filename=pd.read_csv('filename.txt',sep="\t",error_bad_lines=False)
在我收到此错误消息之前,程序运行了大约半小时:
MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object
我想知道是否有办法绕过此内存错误,或者是否有我可以使用的不需要那么多内存的不同功能?我已将文件拆分成多个部分,但问题是我需要一个数据框中的所有数据,以便我可以将其作为一个整体进行分析。
您可以获得内存更大的计算机、simplify/aggregate 数据或尝试其他工具。
如果我这样做,处理这么大的文件,我会切换到 Spark 并使用 PySpark API。
PySpark 将使您能够在超出机器可用内存的情况下使用 pandas 操作。非常适合大数据操作。
In [49]: 7*62388743*8/1e9
Out[49]: 3.493769608
这是您的 3.25 GiB。那只是指向对象的指针。
('field1', dtype('int64')) ('field2', dtype('int64')) ('field3', dtype('O')) ('field4', dtype('O')) ('field5', dtype('O')) ('field6', dtype('O')) ('field7', dtype('O')) ('field8', dtype('O')) ('field9', dtype('O')) ('field10', dtype('int64')) ('field11', dtype('float64')) ('field12', dtype('int64'))
我数了 7 个 dtype('O')
字段;这可以解释形状中的 7;我假设的另一个值是行。 int64
和 float 字段将放在它们自己的数组中。
在 pandas
中,带有字符串的列是对象数据类型;也就是说,它将字符串存储为常规 Python 字符串,因此需要指针。
根据那个字段列表,1,10,11,12都是数字。但同样,如果内容不“正确”,pandas
可能会将它们更改为对象,使用字符串、None、NA 等
您可以使用 usecols
限制列数。这将减少内存占用。您似乎在 CSV 文件中也有一些错误数据,使您认为应该 int64
的列成为 object
。这些可以是空单元格,或任何非数字值。这是一个读取 csv 然后扫描错误数据的示例。此示例使用逗号而不是制表符,因为这样更容易演示。
import pandas as pd
import numpy as np
import io
import re
test_csv = io.StringIO("""field1,field2,field3,other
1,2,3,this
4,what?,6,is
7,,9,extra""")
_numbers_re = re.compile(r"\d+$")
df = pd.read_csv(test_csv,sep=",",error_bad_lines=False,
usecols=['field1', 'field2', 'field3'])
print(df)
# columns that arent int64
bad_cols = list(df.dtypes[df.dtypes!=np.dtype('int64')].index)
if bad_cols:
print("bad cols", bad_cols)
for bad_col in bad_cols:
col = df[bad_col]
bad = col[col.str.match(_numbers_re) != True]
print(bad)
exit(1)
您可以逐行读取您的 csv 文件并在使用迭代器的同时计算指标,而不会增加内存使用量。但您不必从头开始(见下文)。
看看 convtools,它是一个轻量级的 python 库,它允许您定义转换,完成后,它会编写和编译 ad-hoc python引擎盖下的代码,所以你有做你想要的事情的功能。
我使用下面的代码生成了一个测试 csv 文件(81MB;200 万行;3 列)
import csv
from tqdm import tqdm
from random import random, choice
field_1_values = [
"abcde1234567890",
"cde1234567890",
"def1234567890",
"fgh1234567890",
]
with open("input_data-small.csv", "w") as f:
writer = csv.writer(f)
writer.writerow(["field_1", "field_2", "field_3"])
for index in tqdm(range(2000000)):
writer.writerow([choice(field_1_values), random(), index])
让我们使用 convtools
来处理它,例如:
- field_1 的唯一值
- field_2 的第一个值
- field_2的最大值
- field_2 的平均价值
- 行保持 field_2 的最小值
import csv
# pip install convtools
from convtools import conversion as c
# pip install tqdm (for a nice progress bar)
from tqdm import tqdm
# define optional conversions to prepare data
column_to_prepare_conversion = {
"field_2": c.this().as_type(float),
"field_3": c.call_func(int, c.this()),
}
with open("input_data-small.csv", "r") as f:
# csv.DictReader is slow, so let's work with raw rows
reader = csv.reader(f)
# fetch column names
column_names = next(reader)
# adding some free syntactic sugar (no overhead in compiled code):
# allows referencing columns by names, while indexes are used under the hood
def column_(column_name):
return c.item(column_names.index(column_name))
if column_to_prepare_conversion:
prepare_input = c.iter(
# creates iterable of prepared tuples
tuple(
column_(column_name).pipe(
column_to_prepare_conversion[column_name]
)
if column_name in column_to_prepare_conversion
else column_(column_name)
for column_name in column_names
)
)
else:
prepare_input = c.this()
# let's aggregate something
converter = prepare_input.pipe(
# of course there is group_by too: https://convtools.readthedocs.io/en/latest/cheatsheet.html#group-by-simple
c.aggregate(
{
"unique_field_1": c.ReduceFuncs.ArrayDistinct(
column_("field_1")
),
"first_field_2": c.ReduceFuncs.First(column_("field_2")),
"max_field_2": c.ReduceFuncs.Max(column_("field_2")),
"avg_field_2": c.ReduceFuncs.Average(column_("field_2")),
"row_with_min_field_2": c.ReduceFuncs.MinRow(
column_("field_2")
),
}
)
).gen_converter(
# if black is installed, it will print black-formatted code
debug=True
)
results = converter(tqdm(reader))
print(results)
结果:
# 2000000it [00:04, 462371.13it/s]
{
"unique_field_1": [
"cde1234567890",
"fgh1234567890",
"abcde1234567890",
"def1234567890",
],
"first_field_2": 4.149475385772927e-05,
"max_field_2": 0.9999996797416377,
"avg_field_2": 0.49995239963138766,
"row_with_min_field_2": ("fgh1234567890", 3.6033313821626223e-07, 425211),
}
所以,如果我的测试文件有 200 行铣削,处理 462K row/s 将花费 ~ 433 秒(7 分 13 秒)。
JFYI:没有任何数据准备和计算的简单文件读取需要大约 203 秒(3 分 23 秒):
with open("input_data-small.csv", "r") as f:
reader = csv.reader(f)
for row in tqdm(reader):
pass
# 2000001it [00:02, 983404.81it/s]