Pytables 高效读取和处理数以千计的组
Pytables efficiently read and process thousands of groups
这应该很简单,但计算这个我要花很长时间,而且我想不出任何其他方法来处理我的文件。如果有任何想法,我将不胜感激。
TL;DR: I am looking for a way to shortcut for key in store.keys()
and run the same analysis on all data contained within
each node in a HDF file with 261k nodes (aka keys, groups...) such that each node is processed independent of the others.
我有一个H5文件在磁盘上,有几百GB。该文件包含数十万个节点(准确地说是 261k),我想使用相同的方法分别处理它们。每个节点(或组)包含一个 table,带有一个日期时间索引和三个浮点数据列。我想计算每个 table 中每一列的分位数。基本上,这是 H5 文件的样子($ ptdump -av
的部分输出):
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.1',
TITLE := '',
VERSION := '1.0']
/101P09999 (Group) '101P09999'
/101P09999._v_attrs (AttributeSet), 15 attributes:
[CLASS := 'GROUP',
TITLE := '101P09999',
VERSION := '1.0',
data_columns := [],
encoding := 'UTF-8',
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'DATETIME'}},
levels := 1,
metadata := {'STATE': 'Georgia', 'LENGTH': 4.86258, 'COUNTRY': 'USA', 'ROAD_NUMBER': 'US-27/GA-1', 'LATITUDE': 34.88279, 'COUNTY': 'Walker', 'LONGITUDE': -85.27023, 'ROAD_NAME': 'Lafayette Rd/Martha Berry Hwy', 'DIRECTION': 'Northbound'},
nan_rep := 'nan',
non_index_axes := [(1, ['TTAV', 'TTPC', 'TTFT'])],
pandas_type := 'frame_table',
pandas_version := '0.15.2',
table_type := 'appendable_frame',
values_cols := ['values_block_0']]
/101P09999/table (Table(2345,), shuffle, blosc(5)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Float32Col(shape=(3,), dflt=0.0, pos=1)}
byteorder := 'little'
chunkshape := (3276,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/101P09999/table._v_attrs (AttributeSet), 12 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0.0,
FIELD_1_NAME := 'values_block_0',
NROWS := 2345,
TITLE := '',
VERSION := '2.7',
index_kind := 'datetime64',
values_block_0_dtype := 'float32',
values_block_0_kind := ['TTAV', 'TTPC', 'TTFT'],
values_block_0_meta := None]
NOTE: Don't pay attention to the NROWS
value in the above output. This output is just for February 2014, I have the data from
all 12 months in the master table.
对,我想做的是将列 ['TTAV', 'TTPC', 'TTFT']
中的数据除以组属性 metadata['LENGTH']
(在本例中为 4.86258)。接下来我想根据数据的时间戳将其分为6组。然后我想在这 6 个组中的每一个中计算分位数。
我现在的是新手做法:
with pd.HDFStore(store_path, 'r') as store:
for key in store.keys()
sens_data = store[key]
# split_data = split the data into the required groups based on time stamp...
for data in split_data:
data /= store.get_storer(key).attrs.metadata['LENGTH']
perc = split_data.quantile(q=np.arange(0.05, 1, 0.05)).transpose()
# Create a column to contain sensor name:
perc[0] = key[1:]
perc.set_index(0, append=True, inplace=True)
perc.index.rename(['DATA COL', 'SENS NAME'], inplace=True)
# Merge perc into a dictionary of dataframes with keys the groups
# the data was split into, and value a dataframe of appended percs
因此,最终,数据框字典将如下所示:
In [1]: percentiles['night']
Out[1]: 0.05 0.10 ... 0.90 0.95
DATA COL SENS NAME
101P09999 115 118 ... 133 135
TTAV 101P10000 95 100 ... 120 125
101P10001 108 109 ... 111 113
...
101P09999 110 112 ... 133 135
TTPC 101P10000 115 118 ... 133 135
101P10001 115 118 ... 133 135
...
101P09999 115 118 ... 133 135
TTFT 101P10000 115 118 ... 133 135
101P10001 115 118 ... 133 135
... ...
(请原谅我没有为其余行输入随机数据。另外,虽然我在我的示例 DataFrame 中输入了 int
值,但这些值实际上是 float32
,如图所示在 ptdump
输出中。)但这是上面代码末尾所有组(晚上、早上、下午...)的样子。所以基本上,每个 ['TTAV', 'TTPC', 'TTFT']
会有 261k 行,每个对应一个传感器。
现在很明显,for key in store.keys()
不是正确的选择。 (检索所有密钥大约需要 15 分钟!)我发现了无数示例,其中仅检索和处理了一组和 table 的数据,但没有帮助处理所有组。有什么想法吗?并行读取是可以的,虽然我还没有让它工作(它抛出一个 UnImplimented 错误,当我使用键访问单独进程中节点内的数据时。也许我还需要将存储传递给函数)。然而,最大的问题是 for key in store.keys()
需要 15 分钟才能 return 列出所有键。 (请注意,我在处理时不需要所有键的列表,我很乐意在解析文件时获取数据。)
生活还是美好的。我想出了如何同时读取 HDF 文件的方法。所以现在等待 15 分钟来生成所有密钥的列表似乎并不算太糟糕。基本上,这就是现在发生的事情:
def reader(key)
with pd.HDFStore(store_path, 'r') as store:
sens_data = store[key]
# And other awesome stuff here to manipulate the data
# Closing with returning the analyzed data, in my case, percentile computation:
return dict_of_dfs
if __name__ == '__main__':
with Pool(processes=4) as pool:
res_pool = pool.map(reader, sens_names) # I populate sens_names by store.keys() up above.
for key in dict_of_dfs:
dict_of_dfs[key] = dict_of_dfs[key].append(d[key] for d in res_pool)
瞧!比单线程快得多!
Ps。我仍然想知道如何完全避免 store.keys()
。
这应该很简单,但计算这个我要花很长时间,而且我想不出任何其他方法来处理我的文件。如果有任何想法,我将不胜感激。
TL;DR: I am looking for a way to shortcut
for key in store.keys()
and run the same analysis on all data contained within each node in a HDF file with 261k nodes (aka keys, groups...) such that each node is processed independent of the others.
我有一个H5文件在磁盘上,有几百GB。该文件包含数十万个节点(准确地说是 261k),我想使用相同的方法分别处理它们。每个节点(或组)包含一个 table,带有一个日期时间索引和三个浮点数据列。我想计算每个 table 中每一列的分位数。基本上,这是 H5 文件的样子($ ptdump -av
的部分输出):
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.1',
TITLE := '',
VERSION := '1.0']
/101P09999 (Group) '101P09999'
/101P09999._v_attrs (AttributeSet), 15 attributes:
[CLASS := 'GROUP',
TITLE := '101P09999',
VERSION := '1.0',
data_columns := [],
encoding := 'UTF-8',
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'DATETIME'}},
levels := 1,
metadata := {'STATE': 'Georgia', 'LENGTH': 4.86258, 'COUNTRY': 'USA', 'ROAD_NUMBER': 'US-27/GA-1', 'LATITUDE': 34.88279, 'COUNTY': 'Walker', 'LONGITUDE': -85.27023, 'ROAD_NAME': 'Lafayette Rd/Martha Berry Hwy', 'DIRECTION': 'Northbound'},
nan_rep := 'nan',
non_index_axes := [(1, ['TTAV', 'TTPC', 'TTFT'])],
pandas_type := 'frame_table',
pandas_version := '0.15.2',
table_type := 'appendable_frame',
values_cols := ['values_block_0']]
/101P09999/table (Table(2345,), shuffle, blosc(5)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Float32Col(shape=(3,), dflt=0.0, pos=1)}
byteorder := 'little'
chunkshape := (3276,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/101P09999/table._v_attrs (AttributeSet), 12 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0.0,
FIELD_1_NAME := 'values_block_0',
NROWS := 2345,
TITLE := '',
VERSION := '2.7',
index_kind := 'datetime64',
values_block_0_dtype := 'float32',
values_block_0_kind := ['TTAV', 'TTPC', 'TTFT'],
values_block_0_meta := None]
NOTE: Don't pay attention to the
NROWS
value in the above output. This output is just for February 2014, I have the data from all 12 months in the master table.
对,我想做的是将列 ['TTAV', 'TTPC', 'TTFT']
中的数据除以组属性 metadata['LENGTH']
(在本例中为 4.86258)。接下来我想根据数据的时间戳将其分为6组。然后我想在这 6 个组中的每一个中计算分位数。
我现在的是新手做法:
with pd.HDFStore(store_path, 'r') as store:
for key in store.keys()
sens_data = store[key]
# split_data = split the data into the required groups based on time stamp...
for data in split_data:
data /= store.get_storer(key).attrs.metadata['LENGTH']
perc = split_data.quantile(q=np.arange(0.05, 1, 0.05)).transpose()
# Create a column to contain sensor name:
perc[0] = key[1:]
perc.set_index(0, append=True, inplace=True)
perc.index.rename(['DATA COL', 'SENS NAME'], inplace=True)
# Merge perc into a dictionary of dataframes with keys the groups
# the data was split into, and value a dataframe of appended percs
因此,最终,数据框字典将如下所示:
In [1]: percentiles['night']
Out[1]: 0.05 0.10 ... 0.90 0.95
DATA COL SENS NAME
101P09999 115 118 ... 133 135
TTAV 101P10000 95 100 ... 120 125
101P10001 108 109 ... 111 113
...
101P09999 110 112 ... 133 135
TTPC 101P10000 115 118 ... 133 135
101P10001 115 118 ... 133 135
...
101P09999 115 118 ... 133 135
TTFT 101P10000 115 118 ... 133 135
101P10001 115 118 ... 133 135
... ...
(请原谅我没有为其余行输入随机数据。另外,虽然我在我的示例 DataFrame 中输入了 int
值,但这些值实际上是 float32
,如图所示在 ptdump
输出中。)但这是上面代码末尾所有组(晚上、早上、下午...)的样子。所以基本上,每个 ['TTAV', 'TTPC', 'TTFT']
会有 261k 行,每个对应一个传感器。
现在很明显,for key in store.keys()
不是正确的选择。 (检索所有密钥大约需要 15 分钟!)我发现了无数示例,其中仅检索和处理了一组和 table 的数据,但没有帮助处理所有组。有什么想法吗?并行读取是可以的,虽然我还没有让它工作(它抛出一个 UnImplimented 错误,当我使用键访问单独进程中节点内的数据时。也许我还需要将存储传递给函数)。然而,最大的问题是 for key in store.keys()
需要 15 分钟才能 return 列出所有键。 (请注意,我在处理时不需要所有键的列表,我很乐意在解析文件时获取数据。)
生活还是美好的。我想出了如何同时读取 HDF 文件的方法。所以现在等待 15 分钟来生成所有密钥的列表似乎并不算太糟糕。基本上,这就是现在发生的事情:
def reader(key)
with pd.HDFStore(store_path, 'r') as store:
sens_data = store[key]
# And other awesome stuff here to manipulate the data
# Closing with returning the analyzed data, in my case, percentile computation:
return dict_of_dfs
if __name__ == '__main__':
with Pool(processes=4) as pool:
res_pool = pool.map(reader, sens_names) # I populate sens_names by store.keys() up above.
for key in dict_of_dfs:
dict_of_dfs[key] = dict_of_dfs[key].append(d[key] for d in res_pool)
瞧!比单线程快得多!
Ps。我仍然想知道如何完全避免 store.keys()
。