使用 Dask 和 Xarray 进行并行计算
Parallel computation with Dask and Xarray
我有以下功能
@dask.delayed
def load_ds(p):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(p, combine='by_coords', concat_dim="time", parallel=True)
mean = multi_file_dataset['tas'].mean(dim='time')
return mean
打开一组 NetCDF 文件(由路径 p 标识)并计算随时间变化的平均值。
我正在尝试 运行 通过两个不同的路径(= 数据集)并行执行函数:
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
results = dask.compute(*results)
我也试过了
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
futures = dask.persist(*results)
results = dask.compute(*futures)
但是,我注意到当我尝试检索结果时执行实际上开始了:
print(results[0].values)
再次,当我取回第二个时
print(results[1].values)
怎么了?有没有办法只检索一次结果对象?
鉴于您目前所做的,那么:
delayed_task = dask.delayed(
lambda L: (L[0].values, L[1].values)
)(results)
和“稍后”[=17=],
tup = delayed_task.compute()
我有以下功能
@dask.delayed
def load_ds(p):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(p, combine='by_coords', concat_dim="time", parallel=True)
mean = multi_file_dataset['tas'].mean(dim='time')
return mean
打开一组 NetCDF 文件(由路径 p 标识)并计算随时间变化的平均值。
我正在尝试 运行 通过两个不同的路径(= 数据集)并行执行函数:
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
results = dask.compute(*results)
我也试过了
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
futures = dask.persist(*results)
results = dask.compute(*futures)
但是,我注意到当我尝试检索结果时执行实际上开始了:
print(results[0].values)
再次,当我取回第二个时
print(results[1].values)
怎么了?有没有办法只检索一次结果对象?
鉴于您目前所做的,那么:
delayed_task = dask.delayed(
lambda L: (L[0].values, L[1].values)
)(results)
和“稍后”[=17=],
tup = delayed_task.compute()