Dask 和 Xarray 的两个数据集之间的区别
Difference between two datasets with Dask and Xarray
我需要使用 Dask 和 Xarray 计算两个数据集(两个按月重新采样的每日变量)之间的差异。这是我的代码:
def diff(path_1,path_2):
import xarray as xr
max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True)['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True)['variable_2'].resample({'time': '1M'}).min()
return (max_v-min_v).compute()
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)
我也试过这个:
%%time
def max_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
return max_v.compute()
def min_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
return min_v.compute()
futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)
diff = results[0]-results[1]
但是我注意到在 getitem-nanmax e getitem-nanmin 的最后一步计算变得非常慢(例如 1974 out of 1980)。
集群配置如下:
cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)
每个数据集由几个文件组成:总大小=7GB
有没有更好的方法来实现这个计算?
谢谢
不能 100% 确定这对您的情况有效,但是没有 mwe
很难做得更好。所以,我怀疑 xarray
使用的 .compute()
可能与 client.submit
冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以正确地分配工作同行(但这是一个怀疑,我不确定)。所以解决这个问题的一种方法是将计算输出到主脚本中(因为 xarray
将在背景中与 dask
集成),所以也许这会起作用:
import xarray as xr
max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
diff_result = (max_v-min_v).compute()
下面是不同数据集上的 mwe
:
import xarray as xr
# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm', decode_times=True, chunks={'time': 10})
# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()
# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()
# since the data refers to the same variable, all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)
我需要使用 Dask 和 Xarray 计算两个数据集(两个按月重新采样的每日变量)之间的差异。这是我的代码:
def diff(path_1,path_2):
import xarray as xr
max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True)['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True)['variable_2'].resample({'time': '1M'}).min()
return (max_v-min_v).compute()
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)
我也试过这个:
%%time
def max_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
return max_v.compute()
def min_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path, combine='by_coords', concat_dim="time", parallel=True)
min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
return min_v.compute()
futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)
diff = results[0]-results[1]
但是我注意到在 getitem-nanmax e getitem-nanmin 的最后一步计算变得非常慢(例如 1974 out of 1980)。
集群配置如下:
cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)
每个数据集由几个文件组成:总大小=7GB
有没有更好的方法来实现这个计算?
谢谢
不能 100% 确定这对您的情况有效,但是没有 mwe
很难做得更好。所以,我怀疑 xarray
使用的 .compute()
可能与 client.submit
冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以正确地分配工作同行(但这是一个怀疑,我不确定)。所以解决这个问题的一种方法是将计算输出到主脚本中(因为 xarray
将在背景中与 dask
集成),所以也许这会起作用:
import xarray as xr
max_v=xr.open_mfdataset(path_1, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2, combine='by_coords', concat_dim="time", parallel=True, chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
diff_result = (max_v-min_v).compute()
下面是不同数据集上的 mwe
:
import xarray as xr
# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm', decode_times=True, chunks={'time': 10})
# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()
# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()
# since the data refers to the same variable, all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)