Dask 和 Xarray 的两个数据集之间的区别

Difference between two datasets with Dask and Xarray

我需要使用 Dask 和 Xarray 计算两个数据集(两个按月重新采样的每日变量)之间的差异。这是我的代码:

def diff(path_1,path_2):
    import xarray as xr
    max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True)['variable_1'].resample({'time': '1M'}).max()
    min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True)['variable_2'].resample({'time': '1M'}).min()
    
    return (max_v-min_v).compute()
        
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)

我也试过这个:

%%time
def max_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
    max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
    return max_v.compute()

def min_var(path):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
    min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
    return min_v.compute()

futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)

diff = results[0]-results[1]

但是我注意到在 getitem-nanmax e getitem-nanmin 的最后一步计算变得非常慢(例如 1974 out of 1980)。

集群配置如下:

cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)

每个数据集由几个文件组成:总大小=7GB

有没有更好的方法来实现这个计算?

谢谢

不能 100% 确定这对您的情况有效,但是没有 mwe 很难做得更好。所以,我怀疑 xarray 使用的 .compute() 可能与 client.submit 冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以正确地分配工作同行(但这是一个怀疑,我不确定)。所以解决这个问题的一种方法是将计算输出到主脚本中(因为 xarray 将在背景中与 dask 集成),所以也许这会起作用:

import xarray as xr

max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
    
diff_result = (max_v-min_v).compute()

下面是不同数据集上的 mwe

import xarray as xr

# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm', decode_times=True, chunks={'time': 10})

# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()

# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()

# since the data refers to the same variable, all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)