加快将数十亿行写入 HDF5

Speed up writing billions of rows to HDF5

这是我在问题 中试图讨论的场景的延续。请阅读问题以获取有关以下内容的更多详细信息。

由于上面的链接问题由于主题过于宽泛而被关闭,我没有机会从处理数百 GB 数据方面更有经验的人那里收集想法。我对此没有任何经验,而且我正在学习。我显然在某处犯了一些错误,因为我的方法花费的时间太长无法完成。

数据如我在上面的链接问题中所述。我决定为每个传感器创建一个节点(组)(以传感器 ID 作为节点名称,在 root 下)来存储我拥有的 260k 个传感器中的每一个生成的数据。该文件最终将包含 260k 个节点,每个节点将在其下的 Table 中存储几 GB 的数据。完成所有繁重工作的代码如下:

with pd.HDFStore(hdf_path, mode='w') as hdf_store:
    for file in files:
        # Read CSV files in Pandas
        fp = os.path.normpath(os.path.join(path, str(file).zfill(2)) + '.csv')
        df = pd.read_csv(fp, names=data_col_names, skiprows=1, header=None,
                         chunksize=chunk_size, dtype=data_dtype)

        for chunk in df:
            # Manipulate date & epoch to get it in human readable form
            chunk['DATE'] = pd.to_datetime(chunk['DATE'], format='%m%d%Y', box=False)
            chunk['EPOCH'] = pd.to_timedelta(chunk['EPOCH']*5, unit='m')
            chunk['DATETIME'] = chunk['DATE'] + chunk['EPOCH']

            #Group on Sensor to store in HDF5 file
            grouped = chunk.groupby('Sensor')
            for group, data in grouped:
                data.index = data['DATETIME']
                hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])

    # Adding sensor information as metadata to nodes
    for sens in sensors:
        try:
            hdf_store.get_storer(sens).attrs.metadata = sens_dict[sens]
            hdf_store.get_storer(sens).attrs['TITLE'] = sens
        except AttributeError:
            pass

如果我注释掉行 hdf_store.append(group, data.loc[:,['R1', 'R2', 'R3']])for chunk in df: 下的位需要大约 40 - 45 秒 来完成处理迭代。 (我正在读取的块大小是 1M 行。)但是代码中包含该行(也就是说,如果将分组块写入 HDF 文件)代码大约需要 10 - 12 分钟 每次迭代。我对执行时间的增加感到非常困惑。我不知道是什么导致了这种情况。

请给我一些解决问题的建议。请注意,我无法承受那么长的执行时间。我需要以这种方式处理大约 220 GB 的数据。稍后我需要查询该数据,一次查询一个节点,以进行进一步分析。我已经花了 4 天多的时间研究这个主题,但我仍然像刚开始时一样难过。

#### 编辑 1 #### 对于包含 1M 行的块,包括 df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000000 entries, 0 to 999999
Data columns (total 7 columns):
SENSOR      1000000 non-null object
DATE        1000000 non-null datetime64[ns]
EPOCH       1000000 non-null timedelta64[ns]
R1          1000000 non-null float32
R2          773900 non-null float32
R3          483270 non-null float32
DATETIME    1000000 non-null datetime64[ns]
dtypes: datetime64[ns](2), float32(3), object(1), timedelta64[ns](1)
memory usage: 49.6+ MB

其中,只有 DATETIME, R1, R2, R3 被写入文件。

#### 编辑 2 #### 包括 pd.show_versions()

 In [ ] : pd.show_versions()
Out [ ] : INSTALLED VERSIONS
          ------------------
          commit: None
          python: 3.4.3.final.0
          python-bits: 64
          OS: Windows
          OS-release: 8
          machine: AMD64
          processor: Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
          byteorder: little
          LC_ALL: None
          LANG: None

          pandas: 0.17.0
          nose: 1.3.7
          pip: 7.1.2
          setuptools: 18.4
          Cython: 0.23.2
          numpy: 1.10.1
          scipy: 0.16.0
          statsmodels: 0.6.1
          IPython: 4.0.0
          sphinx: 1.3.1
          patsy: 0.4.0
          dateutil: 2.4.1
          pytz: 2015.6
          blosc: None
          bottleneck: 1.0.0
          tables: 3.2.2
          numexpr: 2.4.4
          matplotlib: 1.4.3
          openpyxl: 2.0.2
          xlrd: 0.9.4
          xlwt: 1.0.0
          xlsxwriter: 0.7.3
          lxml: 3.4.4
          bs4: 4.3.2
          html5lib: None
          httplib2: None
          apiclient: None
          sqlalchemy: 1.0.8
          pymysql: None
          psycopg2: None

您不断地对您写入的行进行索引。写入所有行然后创建索引效率更高。

请参阅有关创建索引的文档 here

追加操作通过index=False;这将关闭索引。

然后当你最终完成时,运行(在每个节点上),假设 store 是你的 HDFStore

store.create_table_index('node')

此操作需要一些时间,但不会连续执行一次,而是会执行一次。这产生了巨大的差异,因为创建可以考虑到您的所有数据(并且只移动一次)。

您可能还想 ptrepack 您的数据(在索引操作之前或之后),以重置 chunksize。我不会直接指定它,而是设置 chunksize='auto' 让它在写入所有数据后找出最佳大小。

所以这应该是一个相当快的操作(即使有索引)。

In [38]: N = 1000000

In [39]: df = DataFrame(np.random.randn(N,3).astype(np.float32),columns=list('ABC'),index=pd.date_range('20130101',freq='ms',periods=N))

In [40]: df.info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 1000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:16:39.999000
Freq: L
Data columns (total 3 columns):
A    1000000 non-null float32
B    1000000 non-null float32
C    1000000 non-null float32
dtypes: float32(3)
memory usage: 19.1 MB

In [41]: store = pd.HDFStore('test.h5',mode='w')

In [42]: def write():
   ....:     for i in range(10):
   ....:         dfi = df.copy()
   ....:         dfi.index = df.index + pd.Timedelta(minutes=i)
   ....:         store.append('df',dfi)
   ....:         

In [43]: %timeit -n 1 -r 1 write()
1 loops, best of 1: 4.26 s per loop

In [44]: store.close()

In [45]: pd.read_hdf('test.h5','df').info()
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 10000000 entries, 2013-01-01 00:00:00 to 2013-01-01 00:25:39.999000
Data columns (total 3 columns):
A    float32
B    float32
C    float32
dtypes: float32(3)
memory usage: 190.7 MB

版本

In [46]: pd.__version__
Out[46]: u'0.17.0'

In [49]: import tables

In [50]: tables.__version__
Out[50]: '3.2.2'

In [51]: np.__version__
Out[51]: '1.10.1'