如何使用 xarray 使内存高效多维 groupby/stack?
how to make a memory efficient multiple dimension groupby/stack using xarray?
我有一个频率为 5 分钟的大型 np.float64
时间序列(大小为 ~2,500,000 ~=24 年)。
我使用 Xarray
在内存中表示它,时间维度被命名为 'time'
。
我想按 'time.hour'
分组,然后按 'time.dayofyear'
分组(反之亦然),并从时间序列中删除它们的均值。
为了有效地做到这一点,我需要将时间序列重新排序为一个新的 xr.DataArray
,维度为 ['hour', 'dayofyear', 'rest']
。
我编写了一个函数来处理 Xarray
的 GroupBy
对象并设法做到这一点,尽管这样做需要大量内存...
我有一台 32GB RAM 的机器,但我仍然从 numpy
得到 MemoryError
。
我知道该代码有效,因为我在我的原始时间序列的每小时重新采样版本上使用了它。所以这是代码:
def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='dayofyear'):
"""Takes a time-series xr.DataArray objects and reshapes it using
grp1 and grp2. outout is a xr.Dataset that includes the reshaped DataArray
, its datetime-series and the grps."""
import xarray as xr
import numpy as np
import pandas as pd
# try to infer the freq and put it into attrs for later reconstruction:
freq = pd.infer_freq(time_da[time_dim].values)
name = time_da.name
time_da.attrs['freq'] = freq
attrs = time_da.attrs
# drop all NaNs:
time_da = time_da.dropna(time_dim)
# group grp1 and concat:
grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
s_list = []
for grp_name, grp_inds in grp_obj1.groups.items():
da = time_da.isel({time_dim: grp_inds})
s_list.append(da)
grps1 = [x for x in grp_obj1.groups.keys()]
stacked_da = xr.concat(s_list, dim=grp1)
stacked_da[grp1] = grps1
# group over the concatenated da and concat again:
grp_obj2 = stacked_da.groupby(time_dim + '.' + grp2)
s_list = []
for grp_name, grp_inds in grp_obj2.groups.items():
da = stacked_da.isel({time_dim: grp_inds})
s_list.append(da)
grps2 = [x for x in grp_obj2.groups.keys()]
stacked_da = xr.concat(s_list, dim=grp2)
stacked_da[grp2] = grps2
# numpy part:
# first, loop over both dims and drop NaNs, append values and datetimes:
vals = []
dts = []
for i, grp1_val in enumerate(stacked_da[grp1]):
da = stacked_da.sel({grp1: grp1_val})
for j, grp2_val in enumerate(da[grp2]):
val = da.sel({grp2: grp2_val}).dropna(time_dim)
vals.append(val.values)
dts.append(val[time_dim].values)
# second, we get the max of the vals after the second groupby:
max_size = max([len(x) for x in vals])
# we fill NaNs and NaT for the remainder of them:
concat_sizes = [max_size - len(x) for x in vals]
concat_arrys = [np.empty((x)) * np.nan for x in concat_sizes]
concat_vals = [np.concatenate(x) for x in list(zip(vals, concat_arrys))]
# 1970-01-01 is the NaT for this time-series:
concat_arrys = [np.zeros((x), dtype='datetime64[ns]')
for x in concat_sizes]
concat_dts = [np.concatenate(x) for x in list(zip(dts, concat_arrys))]
concat_vals = np.array(concat_vals)
concat_dts = np.array(concat_dts)
# finally , we reshape them:
concat_vals = concat_vals.reshape((stacked_da[grp1].shape[0],
stacked_da[grp2].shape[0],
max_size))
concat_dts = concat_dts.reshape((stacked_da[grp1].shape[0],
stacked_da[grp2].shape[0],
max_size))
# create a Dataset and DataArrays for them:
sda = xr.Dataset()
sda.attrs = attrs
sda[name] = xr.DataArray(concat_vals, dims=[grp1, grp2, 'rest'])
sda[time_dim] = xr.DataArray(concat_dts, dims=[grp1, grp2, 'rest'])
sda[grp1] = grps1
sda[grp2] = grps2
sda['rest'] = range(max_size)
return sda
所以对于 2,500,000 个项目的时间序列,numpy
抛出 MemoryError
所以我猜这一定是我的记忆瓶颈。我该怎么做才能解决这个问题?
Dask
可以帮助我吗?如果可以,我该如何实施?
像你一样,我 运行 输入小时间序列(10,000 长)时没有问题。但是,当输入一个 100,000 长的时间序列 xr.DataArray
时,grp_obj2
for 循环 运行 消失并使用了系统的所有内存。
这是我用来生成时间序列的xr.DataArray
:
n = 10**5
times = np.datetime64('2000-01-01') + np.arange(n) * np.timedelta64(5,'m')
data = np.random.randn(n)
time_da = xr.DataArray(data, name='rand_data', dims=('time'), coords={'time': times})
# time_da.to_netcdf('rand_time_series.nc')
正如您所指出的,Dask 是解决它的一种方法,但我目前看不到明确的路径...
通常,Dask 的问题类型是:
- 使输入成为来自文件(如 NetCDF)的数据集。这不会将文件加载到内存中,但允许 Dask 一次一个块地从磁盘中提取数据。
- 使用
dask.delayed
或 dask.futures
方法为整个代码体定义所有计算,直到写入输出。这就是允许 Dask 将一小块数据分块以读取然后写入的原因。
- 计算一大块工作并立即将输出写入新的数据集文件。实际上,您最终一次将一大块输入转化为一大块输出(而且 threaded/parallelized)。
我尝试导入 Dask 并将输入 time_da
xr.DataArray
分成块供 Dask 处理,但没有帮助。据我所知,行 stacked_da = xr.concat(s_list, dim=grp1)
强制 Dask 在内存中制作 stacked_da
的完整副本等等......
一种解决方法是将 stacked_da
写入磁盘然后立即再次读取它:
##For group1
xr.concat(s_list, dim=grp1).to_netcdf('stacked_da1.nc')
stacked_da = xr.load_dataset('stacked_da1.nc')
stacked_da[grp1] = grps1
##For group2
xr.concat(s_list, dim=grp2).to_netcdf('stacked_da2.nc')
stacked_da = xr.load_dataset('stacked_da2.nc')
stacked_da[grp2] = grps2
但是,stacked_da1.nc
的文件大小为 19MB,而 stacked_da2.nc
的文件大小为 6.5GB。这是 time_da
有 100,000 个元素的...所以显然有些不对劲...
最初,听起来您想从时间序列数据中减去各组的均值。看起来 Xarray 文档有一个例子。 http://xarray.pydata.org/en/stable/groupby.html#grouped-arithmetic
关键是分组一次并遍历这些组,然后在每个组上再次分组并将其附加到列表中。
接下来我将 pd.MultiIndex.from_product
用于组。
没有内存问题,不需要 Dask
,只需要几秒钟就可以 运行。
这是代码,享受:
def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='month',
plot=True):
"""Takes a time-series xr.DataArray objects and reshapes it using
grp1 and grp2. output is a xr.Dataset that includes the reshaped DataArray
, its datetime-series and the grps. plots the mean also"""
import xarray as xr
import pandas as pd
# try to infer the freq and put it into attrs for later reconstruction:
freq = pd.infer_freq(time_da[time_dim].values)
name = time_da.name
time_da.attrs['freq'] = freq
attrs = time_da.attrs
# drop all NaNs:
time_da = time_da.dropna(time_dim)
# first grouping:
grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
da_list = []
t_list = []
for grp1_name, grp1_inds in grp_obj1.groups.items():
da = time_da.isel({time_dim: grp1_inds})
# second grouping:
grp_obj2 = da.groupby(time_dim + '.' + grp2)
for grp2_name, grp2_inds in grp_obj2.groups.items():
da2 = da.isel({time_dim: grp2_inds})
# extract datetimes and rewrite time coord to 'rest':
times = da2[time_dim]
times = times.rename({time_dim: 'rest'})
times.coords['rest'] = range(len(times))
t_list.append(times)
da2 = da2.rename({time_dim: 'rest'})
da2.coords['rest'] = range(len(da2))
da_list.append(da2)
# get group keys:
grps1 = [x for x in grp_obj1.groups.keys()]
grps2 = [x for x in grp_obj2.groups.keys()]
# concat and convert to dataset:
stacked_ds = xr.concat(da_list, dim='all').to_dataset(name=name)
stacked_ds[time_dim] = xr.concat(t_list, 'all')
# create a multiindex for the groups:
mindex = pd.MultiIndex.from_product([grps1, grps2], names=[grp1, grp2])
stacked_ds.coords['all'] = mindex
# unstack:
ds = stacked_ds.unstack('all')
ds.attrs = attrs
return ds
我有一个频率为 5 分钟的大型 np.float64
时间序列(大小为 ~2,500,000 ~=24 年)。
我使用 Xarray
在内存中表示它,时间维度被命名为 'time'
。
我想按 'time.hour'
分组,然后按 'time.dayofyear'
分组(反之亦然),并从时间序列中删除它们的均值。
为了有效地做到这一点,我需要将时间序列重新排序为一个新的 xr.DataArray
,维度为 ['hour', 'dayofyear', 'rest']
。
我编写了一个函数来处理 Xarray
的 GroupBy
对象并设法做到这一点,尽管这样做需要大量内存...
我有一台 32GB RAM 的机器,但我仍然从 numpy
得到 MemoryError
。
我知道该代码有效,因为我在我的原始时间序列的每小时重新采样版本上使用了它。所以这是代码:
def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='dayofyear'):
"""Takes a time-series xr.DataArray objects and reshapes it using
grp1 and grp2. outout is a xr.Dataset that includes the reshaped DataArray
, its datetime-series and the grps."""
import xarray as xr
import numpy as np
import pandas as pd
# try to infer the freq and put it into attrs for later reconstruction:
freq = pd.infer_freq(time_da[time_dim].values)
name = time_da.name
time_da.attrs['freq'] = freq
attrs = time_da.attrs
# drop all NaNs:
time_da = time_da.dropna(time_dim)
# group grp1 and concat:
grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
s_list = []
for grp_name, grp_inds in grp_obj1.groups.items():
da = time_da.isel({time_dim: grp_inds})
s_list.append(da)
grps1 = [x for x in grp_obj1.groups.keys()]
stacked_da = xr.concat(s_list, dim=grp1)
stacked_da[grp1] = grps1
# group over the concatenated da and concat again:
grp_obj2 = stacked_da.groupby(time_dim + '.' + grp2)
s_list = []
for grp_name, grp_inds in grp_obj2.groups.items():
da = stacked_da.isel({time_dim: grp_inds})
s_list.append(da)
grps2 = [x for x in grp_obj2.groups.keys()]
stacked_da = xr.concat(s_list, dim=grp2)
stacked_da[grp2] = grps2
# numpy part:
# first, loop over both dims and drop NaNs, append values and datetimes:
vals = []
dts = []
for i, grp1_val in enumerate(stacked_da[grp1]):
da = stacked_da.sel({grp1: grp1_val})
for j, grp2_val in enumerate(da[grp2]):
val = da.sel({grp2: grp2_val}).dropna(time_dim)
vals.append(val.values)
dts.append(val[time_dim].values)
# second, we get the max of the vals after the second groupby:
max_size = max([len(x) for x in vals])
# we fill NaNs and NaT for the remainder of them:
concat_sizes = [max_size - len(x) for x in vals]
concat_arrys = [np.empty((x)) * np.nan for x in concat_sizes]
concat_vals = [np.concatenate(x) for x in list(zip(vals, concat_arrys))]
# 1970-01-01 is the NaT for this time-series:
concat_arrys = [np.zeros((x), dtype='datetime64[ns]')
for x in concat_sizes]
concat_dts = [np.concatenate(x) for x in list(zip(dts, concat_arrys))]
concat_vals = np.array(concat_vals)
concat_dts = np.array(concat_dts)
# finally , we reshape them:
concat_vals = concat_vals.reshape((stacked_da[grp1].shape[0],
stacked_da[grp2].shape[0],
max_size))
concat_dts = concat_dts.reshape((stacked_da[grp1].shape[0],
stacked_da[grp2].shape[0],
max_size))
# create a Dataset and DataArrays for them:
sda = xr.Dataset()
sda.attrs = attrs
sda[name] = xr.DataArray(concat_vals, dims=[grp1, grp2, 'rest'])
sda[time_dim] = xr.DataArray(concat_dts, dims=[grp1, grp2, 'rest'])
sda[grp1] = grps1
sda[grp2] = grps2
sda['rest'] = range(max_size)
return sda
所以对于 2,500,000 个项目的时间序列,numpy
抛出 MemoryError
所以我猜这一定是我的记忆瓶颈。我该怎么做才能解决这个问题?
Dask
可以帮助我吗?如果可以,我该如何实施?
像你一样,我 运行 输入小时间序列(10,000 长)时没有问题。但是,当输入一个 100,000 长的时间序列 xr.DataArray
时,grp_obj2
for 循环 运行 消失并使用了系统的所有内存。
这是我用来生成时间序列的xr.DataArray
:
n = 10**5
times = np.datetime64('2000-01-01') + np.arange(n) * np.timedelta64(5,'m')
data = np.random.randn(n)
time_da = xr.DataArray(data, name='rand_data', dims=('time'), coords={'time': times})
# time_da.to_netcdf('rand_time_series.nc')
正如您所指出的,Dask 是解决它的一种方法,但我目前看不到明确的路径... 通常,Dask 的问题类型是:
- 使输入成为来自文件(如 NetCDF)的数据集。这不会将文件加载到内存中,但允许 Dask 一次一个块地从磁盘中提取数据。
- 使用
dask.delayed
或dask.futures
方法为整个代码体定义所有计算,直到写入输出。这就是允许 Dask 将一小块数据分块以读取然后写入的原因。 - 计算一大块工作并立即将输出写入新的数据集文件。实际上,您最终一次将一大块输入转化为一大块输出(而且 threaded/parallelized)。
我尝试导入 Dask 并将输入 time_da
xr.DataArray
分成块供 Dask 处理,但没有帮助。据我所知,行 stacked_da = xr.concat(s_list, dim=grp1)
强制 Dask 在内存中制作 stacked_da
的完整副本等等......
一种解决方法是将 stacked_da
写入磁盘然后立即再次读取它:
##For group1
xr.concat(s_list, dim=grp1).to_netcdf('stacked_da1.nc')
stacked_da = xr.load_dataset('stacked_da1.nc')
stacked_da[grp1] = grps1
##For group2
xr.concat(s_list, dim=grp2).to_netcdf('stacked_da2.nc')
stacked_da = xr.load_dataset('stacked_da2.nc')
stacked_da[grp2] = grps2
但是,stacked_da1.nc
的文件大小为 19MB,而 stacked_da2.nc
的文件大小为 6.5GB。这是 time_da
有 100,000 个元素的...所以显然有些不对劲...
最初,听起来您想从时间序列数据中减去各组的均值。看起来 Xarray 文档有一个例子。 http://xarray.pydata.org/en/stable/groupby.html#grouped-arithmetic
关键是分组一次并遍历这些组,然后在每个组上再次分组并将其附加到列表中。
接下来我将 pd.MultiIndex.from_product
用于组。
没有内存问题,不需要 Dask
,只需要几秒钟就可以 运行。
这是代码,享受:
def time_series_stack(time_da, time_dim='time', grp1='hour', grp2='month',
plot=True):
"""Takes a time-series xr.DataArray objects and reshapes it using
grp1 and grp2. output is a xr.Dataset that includes the reshaped DataArray
, its datetime-series and the grps. plots the mean also"""
import xarray as xr
import pandas as pd
# try to infer the freq and put it into attrs for later reconstruction:
freq = pd.infer_freq(time_da[time_dim].values)
name = time_da.name
time_da.attrs['freq'] = freq
attrs = time_da.attrs
# drop all NaNs:
time_da = time_da.dropna(time_dim)
# first grouping:
grp_obj1 = time_da.groupby(time_dim + '.' + grp1)
da_list = []
t_list = []
for grp1_name, grp1_inds in grp_obj1.groups.items():
da = time_da.isel({time_dim: grp1_inds})
# second grouping:
grp_obj2 = da.groupby(time_dim + '.' + grp2)
for grp2_name, grp2_inds in grp_obj2.groups.items():
da2 = da.isel({time_dim: grp2_inds})
# extract datetimes and rewrite time coord to 'rest':
times = da2[time_dim]
times = times.rename({time_dim: 'rest'})
times.coords['rest'] = range(len(times))
t_list.append(times)
da2 = da2.rename({time_dim: 'rest'})
da2.coords['rest'] = range(len(da2))
da_list.append(da2)
# get group keys:
grps1 = [x for x in grp_obj1.groups.keys()]
grps2 = [x for x in grp_obj2.groups.keys()]
# concat and convert to dataset:
stacked_ds = xr.concat(da_list, dim='all').to_dataset(name=name)
stacked_ds[time_dim] = xr.concat(t_list, 'all')
# create a multiindex for the groups:
mindex = pd.MultiIndex.from_product([grps1, grps2], names=[grp1, grp2])
stacked_ds.coords['all'] = mindex
# unstack:
ds = stacked_ds.unstack('all')
ds.attrs = attrs
return ds