分块创建 xarray DataArray 并将其写入 NetCDF

Create and write xarray DataArray to NetCDF in chunks

是否也可以创建一个核外 DataArray,并使用 xarray 将其逐块写入 NetCDF4 文件?

例如,我希望能够在维度更大时以核外方式执行此操作,因此我无法将整个数组存储在内存中:

num_steps = 20
num_times = 100
#Create DataArray
d = xr.DataArray(np.zeros([num_steps, num_times], np.float32),
                 {'Step': np.arange(num_steps),
                  'Time': np.arange(num_times)},
                 ('Step', 'Time'))
#Computatation
for i in range(num_steps):
    d[i, :] = i
#Write to file
d.to_netcdf('test.nc')

所以我不想在内存中创建整个 NumPy 数组,我希望计算和写入文件阶段一次完成一个块(在此示例中,在步骤维度上分块) .

更新: 似乎(来自@jhamman 的回答)可能无法使用 xarray 实现我上面的示例。我主要感兴趣的是通过 xarray 加深对核外计算的理解,因此我没有要问的具体计算,但是,由于有人要求我提供一个更复杂的示例,因此我可以考虑一个潜在的应用程序有是:

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[i, :] = u[:] * s[:]

其中 us 是时间维度的 xr.DataArrays,fg 是仅依赖于输入数组的 PDE 求解器上一步。假设有 1000 步,但是时间维度太大,我只能在内存中存储一​​两个,所以对 d 的赋值必须写入磁盘,然后释放相关内存。

Dask 数组目前不支持项目分配,请参阅

所以如果 d 是一个 xarray.DataArray 并且在引擎盖下有一个 dask.array,那么这将不起作用。

此外,none 当前的 Xarray 后端支持分块写入。 编辑:正如@shoyer 指出的那样,可以让 xarray 增量写入分块数组。但是,对于您在这里的用例,由于您似乎需要项目分配,因此可能需要直接使用 netCDF4-python 库:

from netCDF4 import Dataset

f = Dataset('test.nc', mode='w')
f.createDimension("Step", nsteps)
f.createDimension("time", ntimes)
d = f.createVariable("d", "f4",("Step", "time"))

#Computatation
for i in range(num_steps):
    d[i, :] = i

我假设您的计算比您的示例更复杂,因此您可能会考虑将 = i 替换为使用 xarray/dask.

的内容

是的,xarray 支持核外数组和块写入。您将需要使用 xarray 操作编写计算,Dask arrays instead of NumPy arrays. The xarray docs 在这里应该会有帮助。

更新:对于这样的模拟,您需要使用 dask.delayed 计算每个函数 f。然后,您可以使用 dask.array.from_delayed 将结果转换为 dask 数组,将它们包装回 xarray.DataArray 并使用 to_netcdf() 将数据直接写入磁盘。结果以流式方式进行,f()g() 并行计算,并且加载到内存中的时间步长不超过几个:

import dask
import dask.array as da
import numpy as np
import xarray

def f(x):
    return 1.1 * x

def g(x):
    return 0.9 * x

num_steps = 1000
num_times = int(1e6)

u = np.ones(num_times)
s = np.ones(num_times)

arrays = []
for i in range(num_steps):
    u = dask.delayed(f)(u)
    s = dask.delayed(g)(s)
    product = da.from_delayed(u * s, shape=(num_times,), dtype=float)
    arrays.append(product)

stacked = da.stack(arrays)
data_array = xarray.DataArray(stacked, dims=['step', 'time'])
%time data_array.to_netcdf('results.nc')
# CPU times: user 7.44 s, sys: 13.5 s, total: 20.9 s
# Wall time: 29.4 s

您会注意到 xarray 是此计算的外围设备:大部分计算是使用 dask/numpy 完成的。您也可以使用 xarray 对象轻松执行此操作,但我们没有方便的方法通过 dask 延迟对象传递带标签的数组元数据,因此无论哪种方式,您都需要在另一端重建元数据。

您可能会争辩说,在这里使用 dask 有点矫枉过正,您可能是对的。即使您想使用 dask 进行并行化,您仍然可能希望在每一步之后以有效的 netCDF 文件的形式检查模拟。

因此,您可能需要一个在每次迭代时扩展 netCDF 文件的简单循环。这是 xarray 的 not yet supported,但这将是一个不错的功能。像下面这样的界面应该是可能的:

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[:] = u[:] * s[:]
    d.to_netcdf('results.nc', extend='step')

同时,您可以为每个步骤编写单独的文件,例如,

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[:] = u[:] * s[:]
    d.to_netcdf('results-%04d.nc' % i)

然后您可以将所有数据一起加载,然后使用 open_mfdataset 将其合并到一个文件中,例如,

combined = xarray.open_mfdataset('results-*.nc', autoclose=True)
combined.to_netcdf('results-combined.nc')