如何使用 xarray 使内存高效多维 groupby/stack?

how to make a memory efficient multiple dimension groupby/stack using xarray?

我有一个频率为 5 分钟的大型 np.float64 时间序列(大小为 ~2,500,000 ~=24 年)。

我使用 Xarray 在内存中表示它,时间维度被命名为 'time'

我想按 'time.hour' 分组,然后按 'time.dayofyear' 分组(反之亦然),并从时间序列中删除它们的均值。

为了有效地做到这一点,我需要将时间序列重新排序为一个新的 xr.DataArray,维度为 ['hour', 'dayofyear', 'rest']

我编写了一个函数来处理 XarrayGroupBy 对象并设法做到这一点,尽管这样做需要大量内存...

我有一台 32GB RAM 的机器,但我仍然从 numpy 得到 MemoryError

我知道该代码有效,因为我在我的原始时间序列的每小时重新采样版本上使用了它。所以这是代码:

def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='dayofyear'):
    """Takes a time-series xr.DataArray objects and reshapes it using
    grp1 and grp2. outout is a xr.Dataset that includes the reshaped DataArray
    , its datetime-series and the grps."""
    import xarray as xr
    import numpy as np
    import pandas as pd

    # try to infer the freq and put it into attrs for later reconstruction:
    freq = pd.infer_freq(time_da[time_dim].values)
    name = time_da.name
    time_da.attrs['freq'] = freq
    attrs = time_da.attrs

    # drop all NaNs:
    time_da = time_da.dropna(time_dim)

    # group grp1 and concat:
    grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
    s_list = []
    for grp_name, grp_inds in grp_obj1.groups.items():
        da = time_da.isel({time_dim: grp_inds})
        s_list.append(da)
    grps1 = [x for x in grp_obj1.groups.keys()]
    stacked_da = xr.concat(s_list, dim=grp1)
    stacked_da[grp1] = grps1

    # group over the concatenated da and concat again:
    grp_obj2 = stacked_da.groupby(time_dim + '.' + grp2)
    s_list = []
    for grp_name, grp_inds in grp_obj2.groups.items():
        da = stacked_da.isel({time_dim: grp_inds})
        s_list.append(da)
    grps2 = [x for x in grp_obj2.groups.keys()]
    stacked_da = xr.concat(s_list, dim=grp2)
    stacked_da[grp2] = grps2

    # numpy part:
    # first, loop over both dims and drop NaNs, append values and datetimes:
    vals = []
    dts = []
    for i, grp1_val in enumerate(stacked_da[grp1]):
        da = stacked_da.sel({grp1: grp1_val})
        for j, grp2_val in enumerate(da[grp2]):
            val = da.sel({grp2: grp2_val}).dropna(time_dim)
            vals.append(val.values)
            dts.append(val[time_dim].values)

    # second, we get the max of the vals after the second groupby:
    max_size = max([len(x) for x in vals])

    # we fill NaNs and NaT for the remainder of them:
    concat_sizes = [max_size - len(x) for x in vals]
    concat_arrys = [np.empty((x)) * np.nan for x in concat_sizes]
    concat_vals = [np.concatenate(x) for x in list(zip(vals, concat_arrys))]
    # 1970-01-01 is the NaT for this time-series:
    concat_arrys = [np.zeros((x), dtype='datetime64[ns]')
                    for x in concat_sizes]
    concat_dts = [np.concatenate(x) for x in list(zip(dts, concat_arrys))]
    concat_vals = np.array(concat_vals)
    concat_dts = np.array(concat_dts)

    # finally , we reshape them:
    concat_vals = concat_vals.reshape((stacked_da[grp1].shape[0],
                                       stacked_da[grp2].shape[0],
                                       max_size))
    concat_dts = concat_dts.reshape((stacked_da[grp1].shape[0],
                                     stacked_da[grp2].shape[0],
                                     max_size))

    # create a Dataset and DataArrays for them:
    sda = xr.Dataset()
    sda.attrs = attrs
    sda[name] = xr.DataArray(concat_vals, dims=[grp1, grp2, 'rest'])
    sda[time_dim] = xr.DataArray(concat_dts, dims=[grp1, grp2, 'rest'])
    sda[grp1] = grps1
    sda[grp2] = grps2
    sda['rest'] = range(max_size)
    return sda

所以对于 2,500,000 个项目的时间序列,numpy 抛出 MemoryError 所以我猜这一定是我的记忆瓶颈。我该怎么做才能解决这个问题?

Dask 可以帮助我吗?如果可以,我该如何实施?

像你一样,我 运行 输入小时间序列(10,000 长)时没有问题。但是,当输入一个 100,000 长的时间序列 xr.DataArray 时,grp_obj2 for 循环 运行 消失并使用了系统的所有内存。

这是我用来生成时间序列的xr.DataArray:

n = 10**5
times = np.datetime64('2000-01-01') + np.arange(n) * np.timedelta64(5,'m')
data = np.random.randn(n)
time_da = xr.DataArray(data, name='rand_data', dims=('time'), coords={'time': times})
# time_da.to_netcdf('rand_time_series.nc')

正如您所指出的,Dask 是解决它的一种方法,但我目前看不到明确的路径... 通常,Dask 的问题类型是:

  1. 使输入成为来自文件(如 NetCDF)的数据集。这不会将文件加载到内存中,但允许 Dask 一次一个块地从磁盘中提取数据。
  2. 使用 dask.delayeddask.futures 方法为整个代码体定义所有计算,直到写入输出。这就是允许 Dask 将一小块数据分块以读取然后写入的原因。
  3. 计算一大块工作并立即将输出写入新的数据集文件。实际上,您最终一次将一大块输入转化为一大块输出(而且 threaded/parallelized)。

我尝试导入 Dask 并将输入 time_da xr.DataArray 分成块供 Dask 处理,但没有帮助。据我所知,行 stacked_da = xr.concat(s_list, dim=grp1) 强制 Dask 在内存中制作 stacked_da 的完整副本等等...... 一种解决方法是将 stacked_da 写入磁盘然后立即再次读取它:

##For group1
xr.concat(s_list, dim=grp1).to_netcdf('stacked_da1.nc')
stacked_da = xr.load_dataset('stacked_da1.nc')
stacked_da[grp1] = grps1

##For group2
xr.concat(s_list, dim=grp2).to_netcdf('stacked_da2.nc')
stacked_da = xr.load_dataset('stacked_da2.nc')
stacked_da[grp2] = grps2

但是,stacked_da1.nc 的文件大小为 19MB,而 stacked_da2.nc 的文件大小为 6.5GB。这是 time_da 有 100,000 个元素的...所以显然有些不对劲...

最初,听起来您想从时间序列数据中减去各组的均值。看起来 Xarray 文档有一个例子。 http://xarray.pydata.org/en/stable/groupby.html#grouped-arithmetic

关键是分组一次并遍历这些组,然后在每个组上再次分组并将其附加到列表中。

接下来我将 pd.MultiIndex.from_product 用于组。

没有内存问题,不需要 Dask,只需要几秒钟就可以 运行。

这是代码,享受:

def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='month',
                      plot=True):
    """Takes a time-series xr.DataArray objects and reshapes it using
    grp1 and grp2. output is a xr.Dataset that includes the reshaped DataArray
    , its datetime-series and the grps. plots the mean also"""
    import xarray as xr
    import pandas as pd
    # try to infer the freq and put it into attrs for later reconstruction:
    freq = pd.infer_freq(time_da[time_dim].values)
    name = time_da.name
    time_da.attrs['freq'] = freq
    attrs = time_da.attrs
    # drop all NaNs:
    time_da = time_da.dropna(time_dim)
    # first grouping:
    grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
    da_list = []
    t_list = []
    for grp1_name, grp1_inds in grp_obj1.groups.items():
        da = time_da.isel({time_dim: grp1_inds})
        # second grouping:
        grp_obj2 = da.groupby(time_dim + '.' + grp2)
        for grp2_name, grp2_inds in grp_obj2.groups.items():
            da2 = da.isel({time_dim: grp2_inds})
            # extract datetimes and rewrite time coord to 'rest':
            times = da2[time_dim]
            times = times.rename({time_dim: 'rest'})
            times.coords['rest'] = range(len(times))
            t_list.append(times)
            da2 = da2.rename({time_dim: 'rest'})
            da2.coords['rest'] = range(len(da2))
            da_list.append(da2)
    # get group keys:
    grps1 = [x for x in grp_obj1.groups.keys()]
    grps2 = [x for x in grp_obj2.groups.keys()]
    # concat and convert to dataset:
    stacked_ds = xr.concat(da_list, dim='all').to_dataset(name=name)
    stacked_ds[time_dim] = xr.concat(t_list, 'all')
    # create a multiindex for the groups:
    mindex = pd.MultiIndex.from_product([grps1, grps2], names=[grp1, grp2])
    stacked_ds.coords['all'] = mindex
    # unstack:
    ds = stacked_ds.unstack('all')
    ds.attrs = attrs
    return ds