使用 Dask 和 Xarray 进行并行计算

Parallel computation with Dask and Xarray

我有以下功能

@dask.delayed
def load_ds(p):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(p, combine='by_coords', concat_dim="time", parallel=True)
    mean = multi_file_dataset['tas'].mean(dim='time')
    return mean

打开一组 NetCDF 文件(由路径 p 标识)并计算随时间变化的平均值。

我正在尝试 运行 通过两个不同的路径(= 数据集)并行执行函数:

results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
   
results = dask.compute(*results)

我也试过了

results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
  
futures = dask.persist(*results)
results = dask.compute(*futures)

但是,我注意到当我尝试检索结果时执行实际上开始了:

 print(results[0].values)

再次,当我取回第二个时

 print(results[1].values)

怎么了?有没有办法只检索一次结果对象?

鉴于您目前所做的,那么:

delayed_task = dask.delayed(
    lambda L: (L[0].values, L[1].values)
)(results)

“稍后”[=1​​7=],

tup = delayed_task.compute()