与 "basic" 分块架构相比,为什么 "optimized" 分块会增加 dask 任务的数量?
Why does the amount of dask task increase with an "optimized" chunking compared to a "basic" chunking schema?
我正在尝试了解不同的分块模式如何使用 xarray 和 dask 加快或减慢我的计算速度。
我已阅读 dask and xarray 指南,但我可能遗漏了一些内容来理解这一点。
问题
我有 2 个内容相同但分块不同的存储空间。
两者都包含一个数据变量 tasmax 以及用 xarray 打开它所必需的坐标变量和元数据。
tasmax 形状是 <xarray.DataArray 'tasmax' (time: 3660, lat: 256, lon: 512)>
第一个存储是zarr存储zarr_init
,我用netCDF文件制作,每年1个文件,10个.nc文件。
当用 xarray 打开它时,我得到一个 chunksize=(366, 256, 512)
的分块模式,因此每个块 1 年,与初始 netCDF 存储相同。
每个块大约 191MB。
第二个存储,zarr_time_opti
也是一个zarr存储,但是,在时间维度上没有分块。
当我用 xarray 打开它并检查 tasmax
时,它的分块模式是 chunksize=(3660, 114, 115)
。
每个块也约为 191MB。
天真地,我希望空间独立的计算 运行 比 zarr_init
更快,并且在 zarr_time_opti
上生成的任务要少得多。
然而,我观察到完全相反的情况:
当基于 groupby("time.month")
计算相同的微积分时,我用 zarr_time_opti
得到 2370 个任务,用 zarr_init
只有 570 个任务。正如您在下面的 MRE 中看到的那样,这与 zarr 本身无关,因为我只能使用 xarray 和 dask 重现该问题。
所以我的问题是:
- 创建那么多任务的 xarray 或 dask 的机制是什么?
- 那么,找到最佳分块模式的策略是什么?
MRE
def simple_climate_index(da):
import time
time_start = time.perf_counter()
# computations
res =( da.groupby("time.month") - da.groupby("time.month").mean("time")).compute()
# summer_days = (da > 25).resample(time="MS").sum().compute()
time_elapsed = time.perf_counter() - time_start
print(f"wall time: {time_elapsed} secs")
def mre_so():
import distributed
import pandas as pd
import numpy as np
client = distributed.Client(memory_limit="16GB", n_workers=1, threads_per_worker=4)
tasmax = xr.DataArray(
data=np.empty((3660, 256, 512), dtype=float),
dims=["time", "lat", "lon"],
coords=dict(
time=pd.date_range("2042-01-01", periods=3660, freq="D"),
lat=np.arange(256),
lon=np.arange(512),
),
name="tasmax",
attrs={"units": "degC"},
)
da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=114, lon=115))
simple_climate_index(da_optimized)
# wall time: ~47 secs - 2370 tasks (observed on client)
da_init = tasmax.copy(deep=True).chunk(dict(time=366, lat=-1, lon=-1))
simple_climate_index(da_init)
# wall time: ~37 secs - 570 tasks (observed on client)
if __name__ == "__main__":
mre_so()
备注
zarr_time_opti
是通过使用 rechunker 重新分块 zarr_init
获得的,这是一个有效重写不同分块模式的库。
- 实际上,我正在通过计算(例如)每个像素上 30 年的第 90 个每日百分位数来进行时间序列分析,然后再次计算每个像素上 tasmax 与该百分位数相比的超出率。
在这种情况下,使用大约 100 年,当时间被分块时,我得到大约 2000 个任务,当时间 not 分块时,我得到大约 85000 个任务。
What is the mechanism with xarray or dask which create that many tasks ?
在 da_optimized
的情况下,您似乎在 lat
和 lon
维度上分块,而在 da_init
中,您只在 da_init
上分块time
维度。
da_optimized
:
da_init
:
当您进行计算时,一开始,每个任务将对应一个块。
关于您的具体示例的旁注:
da_optimized
从 15 个区块开始,da_init
从 10 个区块开始,这会减少 da_init
中的总体任务。因此,为了平衡它们,我将其修改为:
da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=128, lon=103))
- 执行时,xarray 显示此警告:
PerformanceWarning: Slicing with an out-of-order index is generating 11 times more chunks
。所以,我将 simple_climate_index
中的计算简化为:
res = da.groupby("time.month").mean("time").compute()
最好的分块技术取决于您正在执行的操作。
对于在 pandas 中常见的 groupby
操作,我可以理解为什么 da_init
的任务更少,速度更快。对于任何给定的时间戳,所有 lat
+lon
数据都保存在一个块中。 (此外,在这种情况下,Dask 可以根据 groups
优化块数。例如,你是 grouping-by 个月,所以即使你从 100 个块开始,你最终也会得到 12组,可能存储为 one-group-per-chunk,因此,总共 12 个块。我不确定 xarray 是否真的进行了这种优化,我只是说这是可能的。)
在da_optimized
中,一个groupby
会需要chunk之间的通信,因为lat
+lon
数据分布在不同的chunk中,这会导致更多的任务,因此性能下降。
以下是两个操作的(任务)图形可视化:
da_optimized
:
da_init
:
Then, what would be the strategy to find the best chunking schema ?
由于您在“时间”上执行 groupby()
,如果您沿着相同的(时间)维度分块,任务图将是最有效的。
我正在尝试了解不同的分块模式如何使用 xarray 和 dask 加快或减慢我的计算速度。
我已阅读 dask and xarray 指南,但我可能遗漏了一些内容来理解这一点。
问题
我有 2 个内容相同但分块不同的存储空间。
两者都包含一个数据变量 tasmax 以及用 xarray 打开它所必需的坐标变量和元数据。
tasmax 形状是 <xarray.DataArray 'tasmax' (time: 3660, lat: 256, lon: 512)>
第一个存储是zarr存储zarr_init
,我用netCDF文件制作,每年1个文件,10个.nc文件。
当用 xarray 打开它时,我得到一个 chunksize=(366, 256, 512)
的分块模式,因此每个块 1 年,与初始 netCDF 存储相同。
每个块大约 191MB。
第二个存储,zarr_time_opti
也是一个zarr存储,但是,在时间维度上没有分块。
当我用 xarray 打开它并检查 tasmax
时,它的分块模式是 chunksize=(3660, 114, 115)
。
每个块也约为 191MB。
天真地,我希望空间独立的计算 运行 比 zarr_init
更快,并且在 zarr_time_opti
上生成的任务要少得多。
然而,我观察到完全相反的情况:
当基于 groupby("time.month")
计算相同的微积分时,我用 zarr_time_opti
得到 2370 个任务,用 zarr_init
只有 570 个任务。正如您在下面的 MRE 中看到的那样,这与 zarr 本身无关,因为我只能使用 xarray 和 dask 重现该问题。
所以我的问题是:
- 创建那么多任务的 xarray 或 dask 的机制是什么?
- 那么,找到最佳分块模式的策略是什么?
MRE
def simple_climate_index(da):
import time
time_start = time.perf_counter()
# computations
res =( da.groupby("time.month") - da.groupby("time.month").mean("time")).compute()
# summer_days = (da > 25).resample(time="MS").sum().compute()
time_elapsed = time.perf_counter() - time_start
print(f"wall time: {time_elapsed} secs")
def mre_so():
import distributed
import pandas as pd
import numpy as np
client = distributed.Client(memory_limit="16GB", n_workers=1, threads_per_worker=4)
tasmax = xr.DataArray(
data=np.empty((3660, 256, 512), dtype=float),
dims=["time", "lat", "lon"],
coords=dict(
time=pd.date_range("2042-01-01", periods=3660, freq="D"),
lat=np.arange(256),
lon=np.arange(512),
),
name="tasmax",
attrs={"units": "degC"},
)
da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=114, lon=115))
simple_climate_index(da_optimized)
# wall time: ~47 secs - 2370 tasks (observed on client)
da_init = tasmax.copy(deep=True).chunk(dict(time=366, lat=-1, lon=-1))
simple_climate_index(da_init)
# wall time: ~37 secs - 570 tasks (observed on client)
if __name__ == "__main__":
mre_so()
备注
zarr_time_opti
是通过使用 rechunker 重新分块zarr_init
获得的,这是一个有效重写不同分块模式的库。- 实际上,我正在通过计算(例如)每个像素上 30 年的第 90 个每日百分位数来进行时间序列分析,然后再次计算每个像素上 tasmax 与该百分位数相比的超出率。 在这种情况下,使用大约 100 年,当时间被分块时,我得到大约 2000 个任务,当时间 not 分块时,我得到大约 85000 个任务。
What is the mechanism with xarray or dask which create that many tasks ?
在 da_optimized
的情况下,您似乎在 lat
和 lon
维度上分块,而在 da_init
中,您只在 da_init
上分块time
维度。
da_optimized
:
da_init
:
当您进行计算时,一开始,每个任务将对应一个块。
关于您的具体示例的旁注:
da_optimized
从 15 个区块开始,da_init
从 10 个区块开始,这会减少da_init
中的总体任务。因此,为了平衡它们,我将其修改为:
da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=128, lon=103))
- 执行时,xarray 显示此警告:
PerformanceWarning: Slicing with an out-of-order index is generating 11 times more chunks
。所以,我将simple_climate_index
中的计算简化为:
res = da.groupby("time.month").mean("time").compute()
最好的分块技术取决于您正在执行的操作。
对于在 pandas 中常见的 groupby
操作,我可以理解为什么 da_init
的任务更少,速度更快。对于任何给定的时间戳,所有 lat
+lon
数据都保存在一个块中。 (此外,在这种情况下,Dask 可以根据 groups
优化块数。例如,你是 grouping-by 个月,所以即使你从 100 个块开始,你最终也会得到 12组,可能存储为 one-group-per-chunk,因此,总共 12 个块。我不确定 xarray 是否真的进行了这种优化,我只是说这是可能的。)
在da_optimized
中,一个groupby
会需要chunk之间的通信,因为lat
+lon
数据分布在不同的chunk中,这会导致更多的任务,因此性能下降。
以下是两个操作的(任务)图形可视化:
da_optimized
:
da_init
:
Then, what would be the strategy to find the best chunking schema ?
由于您在“时间”上执行 groupby()
,如果您沿着相同的(时间)维度分块,任务图将是最有效的。