将 numpy.polyfit 应用于 xarray 数据集

Applying numpy.polyfit to xarray Dataset

Xarray是否支持polyfit等numpy计算函数?或者有没有一种有效的方法可以将这些函数应用于数据集?

示例:我想计算适合两个变量(温度和高度)的直线的斜率,以计算失效率。我有一个数据集(如下),这两个变量的维度为(垂直、时间、xgrid_0、ygrid_0)。

<xarray.Dataset>
Dimensions:    (PressLev: 7, time: 48, xgrid_0: 685, ygrid_0: 485)
Coordinates:
    gridlat_0  (ygrid_0, xgrid_0) float32 44.6896 44.6956 44.7015 44.7075 ...
    gridlon_0  (ygrid_0, xgrid_0) float32 -129.906 -129.879 -129.851 ...
  * ygrid_0    (ygrid_0) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
  * xgrid_0    (xgrid_0) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
  * time       (time) datetime64[ns] 2016-08-15T01:00:00 2016-08-15T02:00:00 ...
  * PressLev   (PressLev) int64 0 1 2 3 4 5 6
Data variables:
    Temperature       (PressLev, time, ygrid_0, xgrid_0) float64 289.4 289.4 289.4 ...
    Height       (PressLev, time, ygrid_0, xgrid_0) float64 85.23 85.13 84.98 ...

如果我提取给定时间的温度和高度,xgrid_0、ygrid_0;我可以使用 numpy.polyfit 函数。

ds_LR = ds.TMP_P0_L103_GST0 * 0 -9999 # Quick way to make dataarray with -9999 values but with correct dims/coords
for cts in np.arange(0,len(ds_UA.time)):
        for cx in ds_UA.xgrid_0.values:
                for cy in ds_UA.ygrid_0.values:
                        x_temp = ds_UA.Temperature[:,cts,cy,cx] # Grab the vertical profile of air temperature
                        y_hgt  = ds_UA.Height[:,cts,cy,cx] # Grab the vertical heights of air temperature values
                        s      = np.polyfit(y_hgt,x_temp,1) # Fit a line to the data
                        ds_LR[cts,cy,cx].values = s[0] # Grab the slope (first element)

但这是一种缓慢且低效的方法。关于更好的方法有什么建议吗?

据我所知(包括我自己),这已成为 xarray 用户中一个非常普遍的问题,并且与 this Github issue 密切相关。通常,存在某些函数的 NumPy 实现(在您的情况下,np.polyfit()),但不清楚如何最好地将此计算应用于每个网格单元,可能在多个维度上。

在地球科学背景下,有两个主要用例,一个具有简单的解决方案,另一个更复杂:

(1) 简单案例:

您有一个 temp 的 xr.DataArray,它是 (time, lat, lon) 的一个函数,您希望在每个网格框中及时找到趋势。最简单的方法是将 (lat, lon) 坐标分组为一个新坐标,按该坐标分组,然后使用 .apply() 方法。

受到 Ryan Abernathy 的 Gist 启发:<3

# Example data
da = xr.DataArray(np.random.randn(20, 180, 360),
                  dims=('time', 'lat', 'lon'),
                  coords={'time': np.linspace(0,19, 20), 
                  'lat': np.linspace(-90,90,180), 
                  'lon': np.linspace(0,359, 360)})

# define a function to compute a linear trend of a timeseries
def linear_trend(x):
    pf = np.polyfit(x.time, x, 1)
    # need to return an xr.DataArray for groupby
    return xr.DataArray(pf[0])

# stack lat and lon into a single dimension called allpoints
stacked = da.stack(allpoints=['lat','lon'])
# apply the function over allpoints to calculate the trend at each point
trend = stacked.groupby('allpoints').apply(linear_trend)
# unstack back to lat lon coordinates
trend_unstacked = trend.unstack('allpoints')

缺点:对于较大的数组,此方法变得非常慢,并且不容易适用于其他本质上可​​能非常相似的问题。这导致我们...

(2) 更难的情况(以及 OP 的问题):

你有一个 xr.Dataset 变量 tempheight,每个 (plev, time, lat, lon) 的函数,你想找到 tempheight(失效率)每 (time, lat, lon) 个点。

解决此问题的最简单方法是使用 xr.apply_ufunc(),它可以为您提供一定程度的矢量化和 dask 兼容性。 (速度!)

# Example DataArrays
da1 = xr.DataArray(np.random.randn(20, 20, 180, 360),
                   dims=('plev', 'time', 'lat', 'lon'),
                   coords={'plev': np.linspace(0,19, 20), 
                   'time': np.linspace(0,19, 20), 
                   'lat': np.linspace(-90,90,180), 
                   'lon': np.linspace(0,359, 360)})

# Create dataset
ds = xr.Dataset({'Temp': da1, 'Height': da1})

和以前一样,我们创建一个函数来计算我们需要的线性趋势:

def linear_trend(x, y):
    pf = np.polyfit(x, y, 1)
    return xr.DataArray(pf[0])

现在,我们可以使用 xr.apply_ufunc() 沿 plev 维度对 tempheight 的两个 DataArrays 进行回归!

%%time
slopes = xr.apply_ufunc(linear_trend,
                        ds.Height, ds.Temp,
                        vectorize=True,
                        input_core_dims=[['plev'], ['plev']],# reduce along 'plev'
                        )

但是,这种方法也很慢,而且和以前一样,不能很好地扩展到更大的阵列。

CPU times: user 2min 44s, sys: 2.1 s, total: 2min 46s
Wall time: 2min 48s

加快速度:

为了加快计算速度,我们可以使用 xr.DataArray.chunk()heighttemp 数据转换为 dask.arrays。这将我们的数据分成小的、易于管理的块,然后我们可以使用这些块将我们的计算与 apply_ufunc().

中的 dask=parallelized 并行化

N.B。您必须注意不要沿着要应用回归的维度分块!

dask_height = ds.Height.chunk({'lat':10, 'lon':10, 'time': 10})
dask_temp   = ds.Temp.chunk({'lat':10, 'lon':10, 'time': 10})
dask_height

<xarray.DataArray 'Height' (plev: 20, time: 20, lat: 180, lon: 360)>
dask.array<xarray-<this-array>, shape=(20, 20, 180, 360), dtype=float64, chunksize=(20, 10, 10, 10), chunktype=numpy.ndarray>
Coordinates:
  * plev     (plev) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
  * time     (time) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
  * lat      (lat) float64 -90.0 -88.99 -87.99 -86.98 ... 86.98 87.99 88.99 90.0
  * lon      (lon) int64 0 1 2 3 4 5 6 7 8 ... 352 353 354 355 356 357 358 359

现在,再算一次!

%%time
slopes_dask = xr.apply_ufunc(linear_trend,
                             dask_height, dask_temp,
                             vectorize=True,
                             dask='parallelized',
                             input_core_dims=[['plev'], ['plev']], # reduce along 'plev'
                             output_dtypes=['d'],
                             )
CPU times: user 6.55 ms, sys: 2.39 ms, total: 8.94 ms
Wall time: 9.24 ms

显着提速!

希望这对您有所帮助!我在尝试回答它时学到了很多东西:)

最佳

编辑:正如评论中所指出的,要真正比较dask和非dask方法之间的处理时间,你应该使用:

%%time
slopes_dask.compute()

这为您提供了与非 dask 方法相当的墙时间。

然而,重要的是要指出 lazily 对数据进行操作(即,除非您绝对需要它,否则不要加载它)对于处理这种大型数据集来说是更受欢迎的你在气候分析中遇到的。所以我还是建议使用dask方法,因为这样你可以在输入数组上操作很多不同的进程,每个进程只需要几个ms,然后最后才需要等待几分钟把你的成品拿出来。 :)

仅供参考,从 v0.16.0 开始,xarray 已将 polyfit 作为 DatasetDataArray 的方法以及相关的 xarray.polyval 函数实现:

https://xarray.pydata.org/en/stable/generated/xarray.Dataset.polyfit.html

https://xarray.pydata.org/en/stable/generated/xarray.DataArray.polyfit.html

https://xarray.pydata.org/en/stable/generated/xarray.polyval.html

https://xarray.pydata.org/en/stable/whats-new.html#v0-16-0-2020-07-11