加快将数十亿行写入 HDF5
Speed up writing billions of rows to HDF5
这是我在问题 中试图讨论的场景的延续。请阅读问题以获取有关以下内容的更多详细信息。
由于上面的链接问题由于主题过于宽泛而被关闭,我没有机会从处理数百 GB 数据方面更有经验的人那里收集想法。我对此没有任何经验,而且我正在学习。我显然在某处犯了一些错误,因为我的方法花费的时间太长无法完成。
数据如我在上面的链接问题中所述。我决定为每个传感器创建一个节点(组)(以传感器 ID 作为节点名称,在 root 下)来存储我拥有的 260k 个传感器中的每一个生成的数据。该文件最终将包含 260k 个节点,每个节点将在其下的 Table 中存储几 GB 的数据。完成所有繁重工作的代码如下:
with pd.HDFStore(hdf_path, mode='w') as hdf_store:
for file in files:
# Read CSV files in Pandas
fp = os.path.normpath(os.path.join(path, str(file).zfill(2)) + '.csv')
df = pd.read_csv(fp, names=data_col_names, skiprows=1, header=None,
chunksize=chunk_size, dtype=data_dtype)
for chunk in df:
# Manipulate date & epoch to get it in human readable form
chunk['DATE'] = pd.to_datetime(chunk['DATE'], format='%m%d%Y', box=False)
chunk['EPOCH'] = pd.to_timedelta(chunk['EPOCH']*5, unit='m')
chunk['DATETIME'] = chunk['DATE'] + chunk['EPOCH']
#Group on Sensor to store in HDF5 file
grouped = chunk.groupby('Sensor')
for group, data in grouped:
data.index = data['DATETIME']
hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])
# Adding sensor information as metadata to nodes
for sens in sensors:
try:
hdf_store.get_storer(sens).attrs.metadata = sens_dict[sens]
hdf_store.get_storer(sens).attrs['TITLE'] = sens
except AttributeError:
pass
如果我注释掉行 hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])
,for chunk in df:
下的位需要大约 40 - 45 秒 来完成处理迭代。 (我正在读取的块大小是 1M 行。)但是代码中包含该行(也就是说,如果将分组块写入 HDF 文件)代码大约需要 10 - 12 分钟 每次迭代。我对执行时间的增加感到非常困惑。我不知道是什么导致了这种情况。
请给我一些解决问题的建议。请注意,我无法承受那么长的执行时间。我需要以这种方式处理大约 220 GB 的数据。稍后我需要查询该数据,一次查询一个节点,以进行进一步分析。我已经花了 4 天多的时间研究这个主题,但我仍然像刚开始时一样难过。
#### 编辑 1 ####
对于包含 1M 行的块,包括 df.info()
。
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000000 entries, 0 to 999999
Data columns (total 7 columns):
SENSOR 1000000 non-null object
DATE 1000000 non-null datetime64[ns]
EPOCH 1000000 non-null timedelta64[ns]
R1 1000000 non-null float32
R2 773900 non-null float32
R3 483270 non-null float32
DATETIME 1000000 non-null datetime64[ns]
dtypes: datetime64[ns](2), float32(3), object(1), timedelta64[ns](1)
memory usage: 49.6+ MB
其中,只有 DATETIME, R1, R2, R3
被写入文件。
#### 编辑 2 ####
包括 pd.show_versions()
In [ ] : pd.show_versions()
Out [ ] : INSTALLED VERSIONS
------------------
commit: None
python: 3.4.3.final.0
python-bits: 64
OS: Windows
OS-release: 8
machine: AMD64
processor: Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
byteorder: little
LC_ALL: None
LANG: None
pandas: 0.17.0
nose: 1.3.7
pip: 7.1.2
setuptools: 18.4
Cython: 0.23.2
numpy: 1.10.1
scipy: 0.16.0
statsmodels: 0.6.1
IPython: 4.0.0
sphinx: 1.3.1
patsy: 0.4.0
dateutil: 2.4.1
pytz: 2015.6
blosc: None
bottleneck: 1.0.0
tables: 3.2.2
numexpr: 2.4.4
matplotlib: 1.4.3
openpyxl: 2.0.2
xlrd: 0.9.4
xlwt: 1.0.0
xlsxwriter: 0.7.3
lxml: 3.4.4
bs4: 4.3.2
html5lib: None
httplib2: None
apiclient: None
sqlalchemy: 1.0.8
pymysql: None
psycopg2: None
您不断地对您写入的行进行索引。写入所有行然后创建索引效率更高。
请参阅有关创建索引的文档 here。
追加操作通过index=False
;这将关闭索引。
然后当你最终完成时,运行(在每个节点上),假设 store
是你的 HDFStore
。
store.create_table_index('node')
此操作需要一些时间,但不会连续执行一次,而是会执行一次。这产生了巨大的差异,因为创建可以考虑到您的所有数据(并且只移动一次)。
您可能还想 ptrepack
您的数据(在索引操作之前或之后),以重置 chunksize
。我不会直接指定它,而是设置 chunksize='auto'
让它在写入所有数据后找出最佳大小。
所以这应该是一个相当快的操作(即使有索引)。
In [38]: N = 1000000
In [39]: df = DataFrame(np.random.randn(N,3).astype(np.float32),columns=list('ABC'),index=pd.date_range('20130101',freq='ms',periods=N))
In [40]: df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:16:39.999000
Freq: L
Data columns (total 3 columns):
A 1000000 non-null float32
B 1000000 non-null float32
C 1000000 non-null float32
dtypes: float32(3)
memory usage: 19.1 MB
In [41]: store = pd.HDFStore('test.h5',mode='w')
In [42]: def write():
....: for i in range(10):
....: dfi = df.copy()
....: dfi.index = df.index + pd.Timedelta(minutes=i)
....: store.append('df',dfi)
....:
In [43]: %timeit -n 1 -r 1 write()
1 loops, best of 1: 4.26 s per loop
In [44]: store.close()
In [45]: pd.read_hdf('test.h5','df').info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 10000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:25:39.999000
Data columns (total 3 columns):
A float32
B float32
C float32
dtypes: float32(3)
memory usage: 190.7 MB
版本
In [46]: pd.__version__
Out[46]: u'0.17.0'
In [49]: import tables
In [50]: tables.__version__
Out[50]: '3.2.2'
In [51]: np.__version__
Out[51]: '1.10.1'
这是我在问题 中试图讨论的场景的延续。请阅读问题以获取有关以下内容的更多详细信息。
由于上面的链接问题由于主题过于宽泛而被关闭,我没有机会从处理数百 GB 数据方面更有经验的人那里收集想法。我对此没有任何经验,而且我正在学习。我显然在某处犯了一些错误,因为我的方法花费的时间太长无法完成。
数据如我在上面的链接问题中所述。我决定为每个传感器创建一个节点(组)(以传感器 ID 作为节点名称,在 root 下)来存储我拥有的 260k 个传感器中的每一个生成的数据。该文件最终将包含 260k 个节点,每个节点将在其下的 Table 中存储几 GB 的数据。完成所有繁重工作的代码如下:
with pd.HDFStore(hdf_path, mode='w') as hdf_store:
for file in files:
# Read CSV files in Pandas
fp = os.path.normpath(os.path.join(path, str(file).zfill(2)) + '.csv')
df = pd.read_csv(fp, names=data_col_names, skiprows=1, header=None,
chunksize=chunk_size, dtype=data_dtype)
for chunk in df:
# Manipulate date & epoch to get it in human readable form
chunk['DATE'] = pd.to_datetime(chunk['DATE'], format='%m%d%Y', box=False)
chunk['EPOCH'] = pd.to_timedelta(chunk['EPOCH']*5, unit='m')
chunk['DATETIME'] = chunk['DATE'] + chunk['EPOCH']
#Group on Sensor to store in HDF5 file
grouped = chunk.groupby('Sensor')
for group, data in grouped:
data.index = data['DATETIME']
hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])
# Adding sensor information as metadata to nodes
for sens in sensors:
try:
hdf_store.get_storer(sens).attrs.metadata = sens_dict[sens]
hdf_store.get_storer(sens).attrs['TITLE'] = sens
except AttributeError:
pass
如果我注释掉行 hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])
,for chunk in df:
下的位需要大约 40 - 45 秒 来完成处理迭代。 (我正在读取的块大小是 1M 行。)但是代码中包含该行(也就是说,如果将分组块写入 HDF 文件)代码大约需要 10 - 12 分钟 每次迭代。我对执行时间的增加感到非常困惑。我不知道是什么导致了这种情况。
请给我一些解决问题的建议。请注意,我无法承受那么长的执行时间。我需要以这种方式处理大约 220 GB 的数据。稍后我需要查询该数据,一次查询一个节点,以进行进一步分析。我已经花了 4 天多的时间研究这个主题,但我仍然像刚开始时一样难过。
#### 编辑 1 ####
对于包含 1M 行的块,包括 df.info()
。
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000000 entries, 0 to 999999
Data columns (total 7 columns):
SENSOR 1000000 non-null object
DATE 1000000 non-null datetime64[ns]
EPOCH 1000000 non-null timedelta64[ns]
R1 1000000 non-null float32
R2 773900 non-null float32
R3 483270 non-null float32
DATETIME 1000000 non-null datetime64[ns]
dtypes: datetime64[ns](2), float32(3), object(1), timedelta64[ns](1)
memory usage: 49.6+ MB
其中,只有 DATETIME, R1, R2, R3
被写入文件。
#### 编辑 2 ####
包括 pd.show_versions()
In [ ] : pd.show_versions()
Out [ ] : INSTALLED VERSIONS
------------------
commit: None
python: 3.4.3.final.0
python-bits: 64
OS: Windows
OS-release: 8
machine: AMD64
processor: Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
byteorder: little
LC_ALL: None
LANG: None
pandas: 0.17.0
nose: 1.3.7
pip: 7.1.2
setuptools: 18.4
Cython: 0.23.2
numpy: 1.10.1
scipy: 0.16.0
statsmodels: 0.6.1
IPython: 4.0.0
sphinx: 1.3.1
patsy: 0.4.0
dateutil: 2.4.1
pytz: 2015.6
blosc: None
bottleneck: 1.0.0
tables: 3.2.2
numexpr: 2.4.4
matplotlib: 1.4.3
openpyxl: 2.0.2
xlrd: 0.9.4
xlwt: 1.0.0
xlsxwriter: 0.7.3
lxml: 3.4.4
bs4: 4.3.2
html5lib: None
httplib2: None
apiclient: None
sqlalchemy: 1.0.8
pymysql: None
psycopg2: None
您不断地对您写入的行进行索引。写入所有行然后创建索引效率更高。
请参阅有关创建索引的文档 here。
追加操作通过index=False
;这将关闭索引。
然后当你最终完成时,运行(在每个节点上),假设 store
是你的 HDFStore
。
store.create_table_index('node')
此操作需要一些时间,但不会连续执行一次,而是会执行一次。这产生了巨大的差异,因为创建可以考虑到您的所有数据(并且只移动一次)。
您可能还想 ptrepack
您的数据(在索引操作之前或之后),以重置 chunksize
。我不会直接指定它,而是设置 chunksize='auto'
让它在写入所有数据后找出最佳大小。
所以这应该是一个相当快的操作(即使有索引)。
In [38]: N = 1000000
In [39]: df = DataFrame(np.random.randn(N,3).astype(np.float32),columns=list('ABC'),index=pd.date_range('20130101',freq='ms',periods=N))
In [40]: df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:16:39.999000
Freq: L
Data columns (total 3 columns):
A 1000000 non-null float32
B 1000000 non-null float32
C 1000000 non-null float32
dtypes: float32(3)
memory usage: 19.1 MB
In [41]: store = pd.HDFStore('test.h5',mode='w')
In [42]: def write():
....: for i in range(10):
....: dfi = df.copy()
....: dfi.index = df.index + pd.Timedelta(minutes=i)
....: store.append('df',dfi)
....:
In [43]: %timeit -n 1 -r 1 write()
1 loops, best of 1: 4.26 s per loop
In [44]: store.close()
In [45]: pd.read_hdf('test.h5','df').info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 10000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:25:39.999000
Data columns (total 3 columns):
A float32
B float32
C float32
dtypes: float32(3)
memory usage: 190.7 MB
版本
In [46]: pd.__version__
Out[46]: u'0.17.0'
In [49]: import tables
In [50]: tables.__version__
Out[50]: '3.2.2'
In [51]: np.__version__
Out[51]: '1.10.1'