分块创建 xarray DataArray 并将其写入 NetCDF
Create and write xarray DataArray to NetCDF in chunks
是否也可以创建一个核外 DataArray,并使用 xarray 将其逐块写入 NetCDF4 文件?
例如,我希望能够在维度更大时以核外方式执行此操作,因此我无法将整个数组存储在内存中:
num_steps = 20
num_times = 100
#Create DataArray
d = xr.DataArray(np.zeros([num_steps, num_times], np.float32),
{'Step': np.arange(num_steps),
'Time': np.arange(num_times)},
('Step', 'Time'))
#Computatation
for i in range(num_steps):
d[i, :] = i
#Write to file
d.to_netcdf('test.nc')
所以我不想在内存中创建整个 NumPy 数组,我希望计算和写入文件阶段一次完成一个块(在此示例中,在步骤维度上分块) .
更新:
似乎(来自@jhamman 的回答)可能无法使用 xarray 实现我上面的示例。我主要感兴趣的是通过 xarray 加深对核外计算的理解,因此我没有要问的具体计算,但是,由于有人要求我提供一个更复杂的示例,因此我可以考虑一个潜在的应用程序有是:
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[i, :] = u[:] * s[:]
其中 u
和 s
是时间维度的 xr.DataArrays,f
和 g
是仅依赖于输入数组的 PDE 求解器上一步。假设有 1000 步,但是时间维度太大,我只能在内存中存储一两个,所以对 d
的赋值必须写入磁盘,然后释放相关内存。
Dask 数组目前不支持项目分配,请参阅 。
所以如果 d
是一个 xarray.DataArray
并且在引擎盖下有一个 dask.array,那么这将不起作用。
此外,none 当前的 Xarray 后端支持分块写入。 编辑:正如@shoyer 指出的那样,可以让 xarray 增量写入分块数组。但是,对于您在这里的用例,由于您似乎需要项目分配,因此可能需要直接使用 netCDF4-python
库:
from netCDF4 import Dataset
f = Dataset('test.nc', mode='w')
f.createDimension("Step", nsteps)
f.createDimension("time", ntimes)
d = f.createVariable("d", "f4",("Step", "time"))
#Computatation
for i in range(num_steps):
d[i, :] = i
我假设您的计算比您的示例更复杂,因此您可能会考虑将 = i
替换为使用 xarray/dask.
的内容
是的,xarray 支持核外数组和块写入。您将需要使用 xarray 操作编写计算,Dask arrays instead of NumPy arrays. The xarray docs 在这里应该会有帮助。
更新:对于这样的模拟,您需要使用 dask.delayed 计算每个函数 f
。然后,您可以使用 dask.array.from_delayed
将结果转换为 dask 数组,将它们包装回 xarray.DataArray
并使用 to_netcdf()
将数据直接写入磁盘。结果以流式方式进行,f()
和 g()
并行计算,并且加载到内存中的时间步长不超过几个:
import dask
import dask.array as da
import numpy as np
import xarray
def f(x):
return 1.1 * x
def g(x):
return 0.9 * x
num_steps = 1000
num_times = int(1e6)
u = np.ones(num_times)
s = np.ones(num_times)
arrays = []
for i in range(num_steps):
u = dask.delayed(f)(u)
s = dask.delayed(g)(s)
product = da.from_delayed(u * s, shape=(num_times,), dtype=float)
arrays.append(product)
stacked = da.stack(arrays)
data_array = xarray.DataArray(stacked, dims=['step', 'time'])
%time data_array.to_netcdf('results.nc')
# CPU times: user 7.44 s, sys: 13.5 s, total: 20.9 s
# Wall time: 29.4 s
您会注意到 xarray 是此计算的外围设备:大部分计算是使用 dask/numpy 完成的。您也可以使用 xarray 对象轻松执行此操作,但我们没有方便的方法通过 dask 延迟对象传递带标签的数组元数据,因此无论哪种方式,您都需要在另一端重建元数据。
您可能会争辩说,在这里使用 dask 有点矫枉过正,您可能是对的。即使您想使用 dask 进行并行化,您仍然可能希望在每一步之后以有效的 netCDF 文件的形式检查模拟。
因此,您可能需要一个在每次迭代时扩展 netCDF 文件的简单循环。这是 xarray 的 not yet supported,但这将是一个不错的功能。像下面这样的界面应该是可能的:
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[:] = u[:] * s[:]
d.to_netcdf('results.nc', extend='step')
同时,您可以为每个步骤编写单独的文件,例如,
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[:] = u[:] * s[:]
d.to_netcdf('results-%04d.nc' % i)
然后您可以将所有数据一起加载,然后使用 open_mfdataset
将其合并到一个文件中,例如,
combined = xarray.open_mfdataset('results-*.nc', autoclose=True)
combined.to_netcdf('results-combined.nc')
是否也可以创建一个核外 DataArray,并使用 xarray 将其逐块写入 NetCDF4 文件?
例如,我希望能够在维度更大时以核外方式执行此操作,因此我无法将整个数组存储在内存中:
num_steps = 20
num_times = 100
#Create DataArray
d = xr.DataArray(np.zeros([num_steps, num_times], np.float32),
{'Step': np.arange(num_steps),
'Time': np.arange(num_times)},
('Step', 'Time'))
#Computatation
for i in range(num_steps):
d[i, :] = i
#Write to file
d.to_netcdf('test.nc')
所以我不想在内存中创建整个 NumPy 数组,我希望计算和写入文件阶段一次完成一个块(在此示例中,在步骤维度上分块) .
更新: 似乎(来自@jhamman 的回答)可能无法使用 xarray 实现我上面的示例。我主要感兴趣的是通过 xarray 加深对核外计算的理解,因此我没有要问的具体计算,但是,由于有人要求我提供一个更复杂的示例,因此我可以考虑一个潜在的应用程序有是:
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[i, :] = u[:] * s[:]
其中 u
和 s
是时间维度的 xr.DataArrays,f
和 g
是仅依赖于输入数组的 PDE 求解器上一步。假设有 1000 步,但是时间维度太大,我只能在内存中存储一两个,所以对 d
的赋值必须写入磁盘,然后释放相关内存。
Dask 数组目前不支持项目分配,请参阅
所以如果 d
是一个 xarray.DataArray
并且在引擎盖下有一个 dask.array,那么这将不起作用。
此外,none 当前的 Xarray 后端支持分块写入。 编辑:正如@shoyer 指出的那样,可以让 xarray 增量写入分块数组。但是,对于您在这里的用例,由于您似乎需要项目分配,因此可能需要直接使用 netCDF4-python
库:
from netCDF4 import Dataset
f = Dataset('test.nc', mode='w')
f.createDimension("Step", nsteps)
f.createDimension("time", ntimes)
d = f.createVariable("d", "f4",("Step", "time"))
#Computatation
for i in range(num_steps):
d[i, :] = i
我假设您的计算比您的示例更复杂,因此您可能会考虑将 = i
替换为使用 xarray/dask.
是的,xarray 支持核外数组和块写入。您将需要使用 xarray 操作编写计算,Dask arrays instead of NumPy arrays. The xarray docs 在这里应该会有帮助。
更新:对于这样的模拟,您需要使用 dask.delayed 计算每个函数 f
。然后,您可以使用 dask.array.from_delayed
将结果转换为 dask 数组,将它们包装回 xarray.DataArray
并使用 to_netcdf()
将数据直接写入磁盘。结果以流式方式进行,f()
和 g()
并行计算,并且加载到内存中的时间步长不超过几个:
import dask
import dask.array as da
import numpy as np
import xarray
def f(x):
return 1.1 * x
def g(x):
return 0.9 * x
num_steps = 1000
num_times = int(1e6)
u = np.ones(num_times)
s = np.ones(num_times)
arrays = []
for i in range(num_steps):
u = dask.delayed(f)(u)
s = dask.delayed(g)(s)
product = da.from_delayed(u * s, shape=(num_times,), dtype=float)
arrays.append(product)
stacked = da.stack(arrays)
data_array = xarray.DataArray(stacked, dims=['step', 'time'])
%time data_array.to_netcdf('results.nc')
# CPU times: user 7.44 s, sys: 13.5 s, total: 20.9 s
# Wall time: 29.4 s
您会注意到 xarray 是此计算的外围设备:大部分计算是使用 dask/numpy 完成的。您也可以使用 xarray 对象轻松执行此操作,但我们没有方便的方法通过 dask 延迟对象传递带标签的数组元数据,因此无论哪种方式,您都需要在另一端重建元数据。
您可能会争辩说,在这里使用 dask 有点矫枉过正,您可能是对的。即使您想使用 dask 进行并行化,您仍然可能希望在每一步之后以有效的 netCDF 文件的形式检查模拟。
因此,您可能需要一个在每次迭代时扩展 netCDF 文件的简单循环。这是 xarray 的 not yet supported,但这将是一个不错的功能。像下面这样的界面应该是可能的:
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[:] = u[:] * s[:]
d.to_netcdf('results.nc', extend='step')
同时,您可以为每个步骤编写单独的文件,例如,
for i in range(num_steps):
u[:] = f(u)
s[:] = g(s)
d[:] = u[:] * s[:]
d.to_netcdf('results-%04d.nc' % i)
然后您可以将所有数据一起加载,然后使用 open_mfdataset
将其合并到一个文件中,例如,
combined = xarray.open_mfdataset('results-*.nc', autoclose=True)
combined.to_netcdf('results-combined.nc')