与 "basic" 分块架构相比,为什么 "optimized" 分块会增加 dask 任务的数量?

Why does the amount of dask task increase with an "optimized" chunking compared to a "basic" chunking schema?

我正在尝试了解不同的分块模式如何使用 xarray 和 dask 加快或减慢我的计算速度。
我已阅读 dask and xarray 指南,但我可能遗漏了一些内容来理解这一点。

问题

我有 2 个内容相同但分块不同的存储空间。
两者都包含一个数据变量 tasmax 以及用 xarray 打开它所必需的坐标变量和元数据。 tasmax 形状是 <xarray.DataArray 'tasmax' (time: 3660, lat: 256, lon: 512)>

第一个存储是zarr存储zarr_init,我用netCDF文件制作,每年1个文件,10个.nc文件。 当用 xarray 打开它时,我得到一个 chunksize=(366, 256, 512) 的分块模式,因此每个块 1 年,与初始 netCDF 存储相同。 每个块大约 191MB。

第二个存储,zarr_time_opti也是一个zarr存储,但是,在时间维度上没有分块。 当我用 xarray 打开它并检查 tasmax 时,它的分块模式是 chunksize=(3660, 114, 115)。 每个块也约为 191MB。

天真地,我希望空间独立的计算 运行 比 zarr_init 更快,并且在 zarr_time_opti 上生成的任务要少得多。 然而,我观察到完全相反的情况:

当基于 groupby("time.month") 计算相同的微积分时,我用 zarr_time_opti 得到 2370 个任务,用 zarr_init 只有 570 个任务。正如您在下面的 MRE 中看到的那样,这与 zarr 本身无关,因为我只能使用 xarray 和 dask 重现该问题。

所以我的问题是:

MRE

def simple_climate_index(da):
    import time
    time_start = time.perf_counter()
    # computations
    res =( da.groupby("time.month") - da.groupby("time.month").mean("time")).compute()
    # summer_days = (da > 25).resample(time="MS").sum().compute()
    time_elapsed = time.perf_counter() - time_start
    print(f"wall time: {time_elapsed} secs")


def mre_so():
    import distributed
    import pandas as pd
    import numpy as np
    client = distributed.Client(memory_limit="16GB", n_workers=1, threads_per_worker=4)
    tasmax = xr.DataArray(
        data=np.empty((3660, 256, 512), dtype=float),
        dims=["time", "lat", "lon"],
        coords=dict(
            time=pd.date_range("2042-01-01", periods=3660, freq="D"),
            lat=np.arange(256),
            lon=np.arange(512),
        ),
        name="tasmax",
        attrs={"units": "degC"},
    )
    da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=114, lon=115))
    simple_climate_index(da_optimized)
    # wall time: ~47 secs - 2370 tasks (observed on client)
    da_init = tasmax.copy(deep=True).chunk(dict(time=366, lat=-1, lon=-1))
    simple_climate_index(da_init)
    # wall time: ~37 secs - 570 tasks (observed on client)

if __name__ == "__main__":
    mre_so()

备注

What is the mechanism with xarray or dask which create that many tasks ?

da_optimized 的情况下,您似乎在 latlon 维度上分块,而在 da_init 中,您只在 da_init 上分块time维度。

da_optimized:

da_init:

当您进行计算时,一开始,每个任务将对应一个块。


关于您的具体示例的旁注

  1. da_optimized 从 15 个区块开始,da_init 从 10 个区块开始,这会减少 da_init 中的总体任务。因此,为了平衡它们,我将其修改为:
da_optimized = tasmax.copy(deep=True).chunk(dict(time=-1, lat=128, lon=103))
  1. 执行时,xarray 显示此警告:PerformanceWarning: Slicing with an out-of-order index is generating 11 times more chunks。所以,我将 simple_climate_index 中的计算简化为:
res = da.groupby("time.month").mean("time").compute()

最好的分块技术取决于您正在执行的操作。

对于在 pandas 中常见的 groupby 操作,我可以理解为什么 da_init 的任务更少,速度更快。对于任何给定的时间戳,所有 lat+lon 数据都保存在一个块中。 (此外,在这种情况下,Dask 可以根据 groups 优化块数。例如,你是 grouping-by 个月,所以即使你从 100 个块开始,你最终也会得到 12组,可能存储为 one-group-per-chunk,因此,总共 12 个块。我不确定 xarray 是否真的进行了这种优化,我只是说这是可能的。)

da_optimized中,一个groupby会需要chunk之间的通信,因为lat+lon数据分布在不同的chunk中,这会导致更多的任务,因此性能下降。

以下是两个操作的(任务)图形可视化:

da_optimized:

da_init:


Then, what would be the strategy to find the best chunking schema ?

由于您在“时间”上执行 groupby(),如果您沿着相同的(时间)维度分块,任务图将是最有效的。